[GitHub] spark issue #20930: [SPARK-23811][Core] FetchFailed comes before Success of ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20930 **[Test build #89870 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89870/testReport)** for PR 20930 at commit [`fee903c`](https://github.com/apache/spark/commit/fee903c65c59219cdc1c0937ac8be4777142ffbd). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184276403 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) --- End diff -- Yep, you're right. The success completely event in UT was treated as normal success task. I fixed this by ignore this event at the beginning of handleTaskCompletion. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184274946 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +assertDataStructuresEmpty() --- End diff -- Ah, it's used for check job successful complete and all temp structure empty. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21138: [SPARK-24062][Thrift Server] Fix SASL encryption ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21138 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21138: [SPARK-24062][Thrift Server] Fix SASL encryption cannot ...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/21138 Merging to master and branch 2.3. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21160: [SPARK-24094][SS][MINOR] Change description strings of v...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21160 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89866/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21160: [SPARK-24094][SS][MINOR] Change description strings of v...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21160 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21160: [SPARK-24094][SS][MINOR] Change description strings of v...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21160 **[Test build #89866 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89866/testReport)** for PR 21160 at commit [`c008809`](https://github.com/apache/spark/commit/c0088093768798627d188423bf414d4987442255). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21156: [SPARK-24087][SQL] Avoid shuffle when join keys are a su...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21156 **[Test build #89869 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89869/testReport)** for PR 21156 at commit [`a59c94f`](https://github.com/apache/spark/commit/a59c94f5b655fc034ce8907b98022cacf6bf318e). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21138: [SPARK-24062][Thrift Server] Fix SASL encryption cannot ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21138 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89864/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21138: [SPARK-24062][Thrift Server] Fix SASL encryption cannot ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21138 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21138: [SPARK-24062][Thrift Server] Fix SASL encryption cannot ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21138 **[Test build #89864 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89864/testReport)** for PR 21138 at commit [`0077685`](https://github.com/apache/spark/commit/00776858c2e776f46dbe542effe52a19283e752f). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21011: [SPARK-23916][SQL] Add array_join function
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21011 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21011: [SPARK-23916][SQL] Add array_join function
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/21011 Thanks! merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20146 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20146 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2685/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20146: [SPARK-11215][ML] Add multiple columns support to String...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20146 **[Test build #89868 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89868/testReport)** for PR 20146 at commit [`c33cff6`](https://github.com/apache/spark/commit/c33cff6c712bc6cdf03c3d6ac52d12573a70bac6). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21028: [SPARK-23922][SQL] Add arrays_overlap function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21028#discussion_r184266872 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -288,6 +288,114 @@ case class ArrayContains(left: Expression, right: Expression) override def prettyName: String = "array_contains" } +/** + * Checks if the two arrays contain at least one common element. + */ +@ExpressionDescription( + usage = "_FUNC_(a1, a2) - Returns true if a1 contains at least an element present also in a2.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(3, 4, 5)); + true + """, since = "2.4.0") +case class ArraysOverlap(left: Expression, right: Expression) + extends BinaryExpression with ImplicitCastInputTypes { + + private lazy val elementType = inputTypes.head.asInstanceOf[ArrayType].elementType + + override def dataType: DataType = BooleanType + + override def inputTypes: Seq[AbstractDataType] = left.dataType match { --- End diff -- There are similar functions, such as `array_union`(#21061), `array_intersect`(#21102), `array_except`(#21103), and maybe `concat`(#20858) which is slightly different though, to handle two (or more) arrays with the same element type. I think we should use the same way to specify and check input types. I'd like to discuss the best way for it here or somewhere else. cc @kiszk @mn-mikke Do you have any suggestions? Also cc @gatorsmile @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184266100 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) --- End diff -- and it seems Spark will wrongly isssue a job end event, can you check it in the test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184265979 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) --- End diff -- > INFO DAGScheduler: ResultStage 1 () finished in 0.136 s This is unexpected, isn't it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19404: [SPARK-21760] [Streaming] Fix for Structured stre...
Github user rekhajoshm closed the pull request at: https://github.com/apache/spark/pull/19404 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19404: [SPARK-21760] [Streaming] Fix for Structured streaming t...
Github user rekhajoshm commented on the issue: https://github.com/apache/spark/pull/19404 Thanks for the good inputs.Closing this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21162: shaded guava is not used anywhere, seems guava is not sh...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21162 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21162: shaded guava is not used anywhere, seems guava is not sh...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21162 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21162: shaded guava is not used anywhere, seems guava is...
GitHub user yileic opened a pull request: https://github.com/apache/spark/pull/21162 shaded guava is not used anywhere, seems guava is not shaded anymore ## What changes were proposed in this pull request? remove unused code to reduce confusion (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/yileic/spark patch-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21162.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21162 commit d75b8bcc1c1b38a9cd04c81153cf1b1b865c5069 Author: yileicDate: 2018-04-26T03:36:40Z shaded guava is not used anywhere, seems guava is not shaded anymore --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21021: [SPARK-23921][SQL] Add array_sort function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21021#discussion_r184262594 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -117,47 +118,18 @@ case class MapValues(child: Expression) } /** - * Sorts the input array in ascending / descending order according to the natural ordering of - * the array elements and returns it. + * Common base class for [[SortArray]] and [[ArraySort]]. */ -// scalastyle:off line.size.limit -@ExpressionDescription( - usage = "_FUNC_(array[, ascendingOrder]) - Sorts the input array in ascending or descending order according to the natural ordering of the array elements.", - examples = """ -Examples: - > SELECT _FUNC_(array('b', 'd', 'c', 'a'), true); - ["a","b","c","d"] - """) -// scalastyle:on line.size.limit -case class SortArray(base: Expression, ascendingOrder: Expression) - extends BinaryExpression with ExpectsInputTypes with CodegenFallback { - - def this(e: Expression) = this(e, Literal(true)) - - override def left: Expression = base - override def right: Expression = ascendingOrder - override def dataType: DataType = base.dataType - override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, BooleanType) +trait ArraySortUtil extends ExpectsInputTypes with CodegenFallback { + protected def arrayExpression: Expression - override def checkInputDataTypes(): TypeCheckResult = base.dataType match { -case ArrayType(dt, _) if RowOrdering.isOrderable(dt) => - ascendingOrder match { -case Literal(_: Boolean, BooleanType) => - TypeCheckResult.TypeCheckSuccess -case _ => - TypeCheckResult.TypeCheckFailure( -"Sort order in second argument requires a boolean literal.") - } -case ArrayType(dt, _) => - TypeCheckResult.TypeCheckFailure( -s"$prettyName does not support sorting array of type ${dt.simpleString}") -case _ => - TypeCheckResult.TypeCheckFailure(s"$prettyName only supports array input.") - } + // If -1, place null element at the end of the array + // If 1, place null element at the beginning of the array + protected def nullOrder: Int --- End diff -- `nullOrder: NullOrder`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21021: [SPARK-23921][SQL] Add array_sort function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21021#discussion_r184262552 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -300,6 +333,49 @@ case class Reverse(child: Expression) extends UnaryExpression with ImplicitCastI override def prettyName: String = "reverse" } +/** + * Sorts the input array in ascending order according to the natural ordering of + * the array elements and returns it. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ +_FUNC_(array) - Sorts the input array in ascending order. The elements of the input array must + be orderable. Null elements will be placed at the end of the returned array. + """, + examples = """ +Examples: + > SELECT _FUNC_(array('b', 'd', null, 'c', 'a')); + ["a","b","c","d",null] + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ArraySort(child: Expression) extends UnaryExpression with ArraySortUtil { + + override def dataType: DataType = child.dataType + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) + + override def arrayExpression: Expression = child + override def nullOrder: Int = NullOrder.Least --- End diff -- `nullOrder: NullOrder`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21021: [SPARK-23921][SQL] Add array_sort function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21021#discussion_r184262545 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -191,24 +163,85 @@ case class SortArray(base: Expression, ascendingOrder: Expression) if (o1 == null && o2 == null) { 0 } else if (o1 == null) { - 1 + 1 * nullOrder } else if (o2 == null) { - -1 + -1 * nullOrder } else { -ordering.compare(o1, o2) } } } } - override def nullSafeEval(array: Any, ascending: Any): Any = { -val elementType = base.dataType.asInstanceOf[ArrayType].elementType + def sortEval(array: Any, ascending: Boolean): Any = { +val elementType = arrayExpression.dataType.asInstanceOf[ArrayType].elementType val data = array.asInstanceOf[ArrayData].toArray[AnyRef](elementType) if (elementType != NullType) { - java.util.Arrays.sort(data, if (ascending.asInstanceOf[Boolean]) lt else gt) + java.util.Arrays.sort(data, if (ascending) lt else gt) } new GenericArrayData(data.asInstanceOf[Array[Any]]) } +} + +object ArraySortUtil { + type NullOrder = Int + object NullOrder { +val Least: NullOrder = -1 +val Greatest: NullOrder = 1 + } +} + +/** + * Sorts the input array in ascending / descending order according to the natural ordering of + * the array elements and returns it. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ +_FUNC_(array[, ascendingOrder]) - Sorts the input array in ascending or descending order + according to the natural ordering of the array elements. Null elements will be placed + at the beginning of the returned array in ascending order or at the end of the returned + array in descending order. + """, + examples = """ +Examples: + > SELECT _FUNC_(array('b', 'd', null, 'c', 'a'), true); + [null,"a","b","c","d"] + """) +// scalastyle:on line.size.limit +case class SortArray(base: Expression, ascendingOrder: Expression) + extends BinaryExpression with ArraySortUtil { + + def this(e: Expression) = this(e, Literal(true)) + + override def left: Expression = base + override def right: Expression = ascendingOrder + override def dataType: DataType = base.dataType + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, BooleanType) + + override def arrayExpression: Expression = base + override def nullOrder: Int = NullOrder.Greatest --- End diff -- `nullOrder: NullOrder`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21021: [SPARK-23921][SQL] Add array_sort function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21021#discussion_r184261968 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -168,9 +140,9 @@ case class SortArray(base: Expression, ascendingOrder: Expression) if (o1 == null && o2 == null) { 0 } else if (o1 == null) { - -1 + -1 * nullOrder --- End diff -- `nullOrder`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21021: [SPARK-23921][SQL] Add array_sort function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21021#discussion_r184261902 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -191,24 +163,85 @@ case class SortArray(base: Expression, ascendingOrder: Expression) if (o1 == null && o2 == null) { 0 } else if (o1 == null) { - 1 + 1 * nullOrder } else if (o2 == null) { - -1 + -1 * nullOrder } else { -ordering.compare(o1, o2) } } } } - override def nullSafeEval(array: Any, ascending: Any): Any = { -val elementType = base.dataType.asInstanceOf[ArrayType].elementType + def sortEval(array: Any, ascending: Boolean): Any = { +val elementType = arrayExpression.dataType.asInstanceOf[ArrayType].elementType val data = array.asInstanceOf[ArrayData].toArray[AnyRef](elementType) if (elementType != NullType) { - java.util.Arrays.sort(data, if (ascending.asInstanceOf[Boolean]) lt else gt) + java.util.Arrays.sort(data, if (ascending) lt else gt) } new GenericArrayData(data.asInstanceOf[Array[Any]]) } +} + +object ArraySortUtil { + type NullOrder = Int + object NullOrder { +val Least: NullOrder = -1 +val Greatest: NullOrder = 1 + } +} + +/** + * Sorts the input array in ascending / descending order according to the natural ordering of + * the array elements and returns it. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ +_FUNC_(array[, ascendingOrder]) - Sorts the input array in ascending or descending order + according to the natural ordering of the array elements. Null elements will be placed + at the beginning of the returned array in ascending order or at the end of the returned + array in descending order. + """, + examples = """ +Examples: + > SELECT _FUNC_(array('b', 'd', null, 'c', 'a'), true); + [null,"a","b","c","d"] + """) +// scalastyle:on line.size.limit +case class SortArray(base: Expression, ascendingOrder: Expression) + extends BinaryExpression with ArraySortUtil { + + def this(e: Expression) = this(e, Literal(true)) + + override def left: Expression = base + override def right: Expression = ascendingOrder + override def dataType: DataType = base.dataType + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, BooleanType) + + override def arrayExpression: Expression = base + override def nullOrder: Int = NullOrder.Greatest --- End diff -- `Least`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21021: [SPARK-23921][SQL] Add array_sort function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21021#discussion_r184262013 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -191,24 +163,85 @@ case class SortArray(base: Expression, ascendingOrder: Expression) if (o1 == null && o2 == null) { 0 } else if (o1 == null) { - 1 + 1 * nullOrder --- End diff -- `-nullOrder`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21021: [SPARK-23921][SQL] Add array_sort function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21021#discussion_r184262044 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -191,24 +163,85 @@ case class SortArray(base: Expression, ascendingOrder: Expression) if (o1 == null && o2 == null) { 0 } else if (o1 == null) { - 1 + 1 * nullOrder } else if (o2 == null) { - -1 + -1 * nullOrder --- End diff -- `nullOrder`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21021: [SPARK-23921][SQL] Add array_sort function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21021#discussion_r184261996 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -168,9 +140,9 @@ case class SortArray(base: Expression, ascendingOrder: Expression) if (o1 == null && o2 == null) { 0 } else if (o1 == null) { - -1 + -1 * nullOrder } else if (o2 == null) { - 1 + 1 * nullOrder --- End diff -- `-nullOrder`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21021: [SPARK-23921][SQL] Add array_sort function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21021#discussion_r184262129 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -117,47 +118,18 @@ case class MapValues(child: Expression) } /** - * Sorts the input array in ascending / descending order according to the natural ordering of - * the array elements and returns it. + * Common base class for [[SortArray]] and [[ArraySort]]. */ -// scalastyle:off line.size.limit -@ExpressionDescription( - usage = "_FUNC_(array[, ascendingOrder]) - Sorts the input array in ascending or descending order according to the natural ordering of the array elements.", - examples = """ -Examples: - > SELECT _FUNC_(array('b', 'd', 'c', 'a'), true); - ["a","b","c","d"] - """) -// scalastyle:on line.size.limit -case class SortArray(base: Expression, ascendingOrder: Expression) - extends BinaryExpression with ExpectsInputTypes with CodegenFallback { - - def this(e: Expression) = this(e, Literal(true)) - - override def left: Expression = base - override def right: Expression = ascendingOrder - override def dataType: DataType = base.dataType - override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, BooleanType) +trait ArraySortUtil extends ExpectsInputTypes with CodegenFallback { + protected def arrayExpression: Expression - override def checkInputDataTypes(): TypeCheckResult = base.dataType match { -case ArrayType(dt, _) if RowOrdering.isOrderable(dt) => - ascendingOrder match { -case Literal(_: Boolean, BooleanType) => - TypeCheckResult.TypeCheckSuccess -case _ => - TypeCheckResult.TypeCheckFailure( -"Sort order in second argument requires a boolean literal.") - } -case ArrayType(dt, _) => - TypeCheckResult.TypeCheckFailure( -s"$prettyName does not support sorting array of type ${dt.simpleString}") -case _ => - TypeCheckResult.TypeCheckFailure(s"$prettyName only supports array input.") - } + // If -1, place null element at the end of the array + // If 1, place null element at the beginning of the array --- End diff -- These comments are not correct now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21021: [SPARK-23921][SQL] Add array_sort function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21021#discussion_r184261947 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -300,6 +333,49 @@ case class Reverse(child: Expression) extends UnaryExpression with ImplicitCastI override def prettyName: String = "reverse" } +/** + * Sorts the input array in ascending order according to the natural ordering of + * the array elements and returns it. + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ +_FUNC_(array) - Sorts the input array in ascending order. The elements of the input array must + be orderable. Null elements will be placed at the end of the returned array. + """, + examples = """ +Examples: + > SELECT _FUNC_(array('b', 'd', null, 'c', 'a')); + ["a","b","c","d",null] + """, + since = "2.4.0") +// scalastyle:on line.size.limit +case class ArraySort(child: Expression) extends UnaryExpression with ArraySortUtil { + + override def dataType: DataType = child.dataType + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType) + + override def arrayExpression: Expression = child + override def nullOrder: Int = NullOrder.Least --- End diff -- `Greatest`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21008: [SPARK-23902][SQL] Add roundOff flag to months_be...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21008 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21008: [SPARK-23902][SQL] Add roundOff flag to months_between
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/21008 Thanks! merging to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184260815 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +assertDataStructuresEmpty() --- End diff -- I mean the last line, `assertDataStructuresEmpty` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184260597 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) --- End diff -- The success task will be ignored by `OutputCommitCoordinator.taskCompleted`, in the taskCompleted logic, stageStates.getOrElse will return because the current stage is in failed set. The detailed log providing below: ``` 18/04/26 10:50:24.524 ScalaTest-run-running-DAGSchedulerSuite INFO DAGScheduler: Resubmitting ShuffleMapStage 0 (RDD at DAGSchedulerSuite.scala:74) and ResultStage 1 () due to fetch failure 18/04/26 10:50:24.535 ScalaTest-run-running-DAGSchedulerSuite DEBUG DAGSchedulerSuite$$anon$6: Increasing epoch to 2 18/04/26 10:50:24.538 ScalaTest-run-running-DAGSchedulerSuite INFO DAGScheduler: Executor lost: exec-hostA (epoch 1) 18/04/26 10:50:24.540 ScalaTest-run-running-DAGSchedulerSuite INFO DAGScheduler: Shuffle files lost for executor: exec-hostA (epoch 1) 18/04/26 10:50:24.545 ScalaTest-run-running-DAGSchedulerSuite DEBUG DAGSchedulerSuite$$anon$6: Increasing epoch to 3 18/04/26 10:50:24.552
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184260210 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +assertDataStructuresEmpty() --- End diff -- I add this test for answering your previous question "Can you simulate what happens to result task if FechFaileded comes before task success?". This test can pass without my code changing in DAGScheduler. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21040: [SPARK-23930][SQL] Add slice function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21040#discussion_r184257279 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CollectionExpressionsSuite.scala --- @@ -105,4 +105,28 @@ class CollectionExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper checkEvaluation(ArrayContains(a3, Literal("")), null) checkEvaluation(ArrayContains(a3, Literal.create(null, StringType)), null) } + + test("Slice") { +val a0 = Literal.create(Seq(1, 2, 3, 4, 5, 6), ArrayType(IntegerType)) +val a1 = Literal.create(Seq[String]("a", "b", "c", "d"), ArrayType(StringType)) +val a2 = Literal.create(Seq[String]("", null, "a", "b"), ArrayType(StringType)) + +checkEvaluation(Slice(a0, Literal(1), Literal(2)), Seq(1, 2)) +checkEvaluation(Slice(a0, Literal(-3), Literal(2)), Seq(4, 5)) +checkEvaluation(Slice(a0, Literal(4), Literal(10)), Seq(4, 5, 6)) +checkEvaluation(Slice(a0, Literal(-1), Literal(2)), Seq(6)) +checkExceptionInExpression[RuntimeException](Slice(a0, Literal(1), Literal(-1)), + "Unexpected value for length") +checkExceptionInExpression[RuntimeException](Slice(a0, Literal(0), Literal(1)), + "Unexpected value for start") +checkEvaluation(Slice(a0, Literal(-20), Literal(1)), Seq.empty[Int]) +checkEvaluation(Slice(a0, Literal.create(null, IntegerType), Literal(2)), null) +checkEvaluation(Slice(a0, Literal(2), Literal.create(null, IntegerType)), null) +checkEvaluation(Slice(Literal.create(null, ArrayType(IntegerType)), Literal(1), Literal(2)), + null) --- End diff -- And also can you add a case for nullable primitive array like `Slice(Seq(1, 2, null, 4), 2, 3)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21040: [SPARK-23930][SQL] Add slice function
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21040#discussion_r184257274 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -287,3 +287,101 @@ case class ArrayContains(left: Expression, right: Expression) override def prettyName: String = "array_contains" } + + +/** + * Slices an array according to the requested start index and length + */ +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = "_FUNC_(a1, a2) - Subsets array x starting from index start (or starting from the end if start is negative) with the specified length.", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3, 4), 2, 2); + [2,3] + > SELECT _FUNC_(array(1, 2, 3, 4), -2, 2); + [3,4] + """, since = "2.4.0") +// scalastyle:on line.size.limit +case class Slice(x: Expression, start: Expression, length: Expression) + extends TernaryExpression with ImplicitCastInputTypes { + + override def dataType: DataType = x.dataType + + override def inputTypes: Seq[AbstractDataType] = Seq(ArrayType, IntegerType, IntegerType) + + override def nullable: Boolean = children.exists(_.nullable) + + override def foldable: Boolean = children.forall(_.foldable) + + override def children: Seq[Expression] = Seq(x, start, length) + + override def nullSafeEval(xVal: Any, startVal: Any, lengthVal: Any): Any = { +val startInt = startVal.asInstanceOf[Int] +val lengthInt = lengthVal.asInstanceOf[Int] +val arr = xVal.asInstanceOf[ArrayData] +val startIndex = if (startInt == 0) { + throw new RuntimeException( +s"Unexpected value for start in function $prettyName: SQL array indices start at 1.") +} else if (startInt < 0) { + startInt + arr.numElements() +} else { + startInt - 1 +} +if (lengthInt < 0) { + throw new RuntimeException(s"Unexpected value for length in function $prettyName: " + +s"length must be greater than or equal to 0.") +} +// this can happen if start is negative and its absolute value is greater than the +// number of elements in the array +if (startIndex < 0) { + return new GenericArrayData(Array.empty[AnyRef]) +} +val elementType = x.dataType.asInstanceOf[ArrayType].elementType +val data = arr.toArray[AnyRef](elementType) +new GenericArrayData(data.slice(startIndex, startIndex + lengthInt)) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val elementType = x.dataType.asInstanceOf[ArrayType].elementType +nullSafeCodeGen(ctx, ev, (x, start, length) => { + val arrayClass = classOf[GenericArrayData].getName + val values = ctx.freshName("values") + val i = ctx.freshName("i") + val startIdx = ctx.freshName("startIdx") + val resLength = ctx.freshName("resLength") + val defaultIntValue = CodeGenerator.defaultValue(CodeGenerator.JAVA_INT, false) + s""" + |${CodeGenerator.JAVA_INT} $startIdx = $defaultIntValue; + |${CodeGenerator.JAVA_INT} $resLength = $defaultIntValue; + |if ($start == 0) { + | throw new RuntimeException("Unexpected value for start in function $prettyName: " + |+ "SQL array indices start at 1."); + |} else if ($start < 0) { + | $startIdx = $start + $x.numElements(); + |} else { + | // arrays in SQL are 1-based instead of 0-based + | $startIdx = $start - 1; + |} + |if ($length < 0) { + | throw new RuntimeException("Unexpected value for length in function $prettyName: " + |+ "length must be greater than or equal to 0."); + |} else if ($length > $x.numElements() - $startIdx) { + | $resLength = $x.numElements() - $startIdx; + |} else { + | $resLength = $length; + |} + |Object[] $values; + |if ($startIdx < 0) { + | $values = new Object[0]; + |} else { + | $values = new Object[$resLength]; + | for (int $i = 0; $i < $resLength; $i ++) { + |$values[$i] = ${CodeGenerator.getValue(x, elementType, s"$i + $startIdx")}; --- End diff -- I might miss something, but seems like `CreateArray` is using different ways to codegen for primitive arrays and the others, and I guess `GenerateSafeProjection` is using `Object[]` on purpose to create `GenericArrayData` to be "safe" (avoid using `UnsafeXxx`). I think we should modify this codegen to avoid boxing. WDYT?
[GitHub] spark issue #21161: [SPARK-21645]left outer join synchronize the condition f...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21161 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21118 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89863/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21118 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21161: left outer join synchronize the condition from left tabl...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21161 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21118 **[Test build #89863 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89863/testReport)** for PR 21118 at commit [`16f1b6e`](https://github.com/apache/spark/commit/16f1b6e7fd8b658feabf25d7c1a354390dcd8eaa). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21161: left outer join synchronize the condition from le...
GitHub user shining1989 opened a pull request: https://github.com/apache/spark/pull/21161 left outer join synchronize the condition from left table to right ## What changes were proposed in this pull request? (Please fill in changes proposed in this fix) ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/shining1989/spark my_change Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21161.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21161 commit d6f24ee52555615f31c00948f4c33210712f58a8 Author: shining1989 <785294663@...> Date: 2018-04-26T02:20:36Z left outer join synchronize the condition from left table to right --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21156: [SPARK-24087][SQL] Avoid shuffle when join keys are a su...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21156 **[Test build #89867 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89867/testReport)** for PR 21156 at commit [`b6bfdc2`](https://github.com/apache/spark/commit/b6bfdc21ed8edf98f9a3b9ac1c253c59adb141a2). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184255020 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) --- End diff -- where is the code in `DAGScheduler` we ignore this task? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20930: [SPARK-23811][Core] FetchFailed comes before Succ...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20930#discussion_r184254856 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2399,6 +2399,84 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } } + /** + * This tests the case where origin task success after speculative task got FetchFailed + * before. + */ + test("SPARK-23811: ShuffleMapStage failed by FetchFailed should ignore following " + +"successful tasks") { +// Create 3 RDDs with shuffle dependencies on each other: rddA <--- rddB <--- rddC +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId + +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +val shuffleDepB = new ShuffleDependency(rddB, new HashPartitioner(2)) + +val rddC = new MyRDD(sc, 2, List(shuffleDepB), tracker = mapOutputTracker) + +submit(rddC, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task success +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Check currently missing partition. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) +// The second result task self success soon. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +// Missing partition number should not change, otherwise it will cause child stage +// never succeed. + assert(mapOutputTracker.findMissingPartitions(shuffleDepB.shuffleId).get.size === 1) + } + + test("SPARK-23811: check ResultStage failed by FetchFailed can ignore following " + +"successful tasks") { +val rddA = new MyRDD(sc, 2, Nil) +val shuffleDepA = new ShuffleDependency(rddA, new HashPartitioner(2)) +val shuffleIdA = shuffleDepA.shuffleId +val rddB = new MyRDD(sc, 2, List(shuffleDepA), tracker = mapOutputTracker) +submit(rddB, Array(0, 1)) + +// Complete both tasks in rddA. +assert(taskSets(0).stageId === 0 && taskSets(0).stageAttemptId === 0) +complete(taskSets(0), Seq( + (Success, makeMapStatus("hostA", 2)), + (Success, makeMapStatus("hostB", 2 + +// The first task of rddB success +assert(taskSets(1).tasks(0).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(0), Success, makeMapStatus("hostB", 2))) + +// The second task's speculative attempt fails first, but task self still running. +// This may caused by ExecutorLost. +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), + FetchFailed(makeBlockManagerId("hostA"), shuffleIdA, 0, 0, "ignored"), + null)) +// Make sure failedStage is not empty now +assert(scheduler.failedStages.nonEmpty) +// The second result task self success soon. +assert(taskSets(1).tasks(1).isInstanceOf[ResultTask[_, _]]) +runEvent(makeCompletionEvent( + taskSets(1).tasks(1), Success, makeMapStatus("hostB", 2))) +assertDataStructuresEmpty() --- End diff -- what does this test? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20560: [SPARK-23375][SQL] Eliminate unneeded Sort in Optimizer
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20560 @rxin It seems you are talking about the followup PR: https://github.com/apache/spark/pull/21072 I think this is the way we do back propagation in catalyst: match a specific node, traverse down the subtree with the properties. For forward propagation, we also need to carefully handle some nodes that would stop the propagation. In `RemoveRedundantSorts.canEliminateSort`, we are doing the same thing: only list the nodes that can retain the properties. e.g. `Limit` should stop propagating the sorting property. I think `Project`, `Filter`, `Hint` is good enough as an initial list, we can expand it later. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21157: [SPARK-22674][PYTHON] Removed the namedtuple pickling pa...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21157 Solid -1 if it breaks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21160: [SPARK-24094][SS][MINOR] Change description strings of v...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21160 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21160: [SPARK-24094][SS][MINOR] Change description strings of v...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21160 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2684/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21160: [SPARK-24094][SS][MINOR] Change description strings of v...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21160 **[Test build #89866 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89866/testReport)** for PR 21160 at commit [`c008809`](https://github.com/apache/spark/commit/c0088093768798627d188423bf414d4987442255). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21160: [SPARK-24094][MINOR][SS] Change description strin...
GitHub user tdas opened a pull request: https://github.com/apache/spark/pull/21160 [SPARK-24094][MINOR][SS] Change description strings of v2 streaming sources to reflect the change ## What changes were proposed in this pull request? This makes it easy to understand at runtime which version is running. Great for debugging production issues. ## How was this patch tested? Not necessary. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tdas/spark SPARK-24094 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21160.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21160 commit c0088093768798627d188423bf414d4987442255 Author: Tathagata DasDate: 2018-04-26T01:48:07Z Changed --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21157: [SPARK-22674][PYTHON] Removed the namedtuple pickling pa...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21157 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89865/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21157: [SPARK-22674][PYTHON] Removed the namedtuple pickling pa...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21157 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21157: [SPARK-22674][PYTHON] Removed the namedtuple pickling pa...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21157 **[Test build #89865 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89865/testReport)** for PR 21157 at commit [`eadc0c8`](https://github.com/apache/spark/commit/eadc0c8af853a57ee80f5e80fb708451931eedc0). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21141: [SPARK-23853][PYSPARK][TEST] Run Hive-related PyS...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21141#discussion_r184250774 --- Diff: python/pyspark/sql/tests.py --- @@ -3021,6 +3021,17 @@ def test_sort_with_nulls_order(self): class HiveSparkSubmitTests(SparkSubmitTests): +@classmethod +def setUpClass(cls): +import glob +from pyspark.find_spark_home import _find_spark_home + +SPARK_HOME = _find_spark_home() +filename_pattern = ("sql/hive/target/spark-hive_*-sources.jar") +if not glob.glob(os.path.join(SPARK_HOME, filename_pattern)): +raise unittest.SkipTest( --- End diff -- I think we should see if @holdenk likes this or not too while we are here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21050: [SPARK-23912][SQL]add array_distinct
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21050 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21050: [SPARK-23912][SQL]add array_distinct
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21050 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89862/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21050: [SPARK-23912][SQL]add array_distinct
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21050 **[Test build #89862 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89862/testReport)** for PR 21050 at commit [`f55abae`](https://github.com/apache/spark/commit/f55abae9df52455a7d7d9a2107041349cfb74853). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21157: [SPARK-22674][PYTHON] Removed the namedtuple pickling pa...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21157 **[Test build #89865 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89865/testReport)** for PR 21157 at commit [`eadc0c8`](https://github.com/apache/spark/commit/eadc0c8af853a57ee80f5e80fb708451931eedc0). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21157: [SPARK-22674][PYTHON] Removed the namedtuple pickling pa...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21157 Why don't we try to fix it rather than removing out? Does the test even pass? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21157: [SPARK-22674][PYTHON] Removed the namedtuple pickling pa...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21157 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21070 I just run `TPCDSQueryBenchmark` in an ec2 instance (m4.2xlarge). The job to check performance regression is too heavy, so we just check if `tpcds`, `tpch`, and `ssb` can be correctly compiled in `TPCDSQuerySuite`, `TPCHQuerySuite`, and `SSBQuerySuite`. BTW, I sometime checks the regression in my repo by myself: https://github.com/maropu/spark-tpcds-datagen. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21144: [SPARK-24043][SQL] Interpreted Predicate should initiali...
Github user bersprockets commented on the issue: https://github.com/apache/spark/pull/21144 @hvanhovell I couldn't find another path that had this particular issue, except possibly in the aggregation area (where a grouping projection is created but not initialized). But I couldn't make it fail there. Also, I turned off as much of the code generation as I could (through patching, etc,) and ran the SQL unit tests. I didn't get any 'requirement failed' exceptions. However, I did get 1270 failed tests, 90% of which failed with UnsupportedOperationExceptions (SortPrefix.eval is the one I keep seeing). Fixing those issues may allow more 'requirement failed' exceptions to pop out. The original unit test that got me looking at this ("SPARK-10740: handle nondeterministic expressions correctly for set operations") is still failing, but for a different reason: The rows in the table are processed in a different order between interpreted and codegen mode. rand(7) is used as a filter, and although the sequence of random numbers is the same, the row to which each random number is being compared is different between interpreted and codegen mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21056: [SPARK-23849][SQL] Tests for samplingRatio of jso...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21056 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21142: [SPARK-24069][R] Add array_min / array_max functi...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21142 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21143: [SPARK-24072][SQL] clearly define pushed filters
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21143 This is a very good point! Unfortunately Spark SQL doesn't support change the physical plan at a per-split fashion, and I'd say this feature is non-trivial to implement, and needs a design doc. After we have this feature, it might make sense to shift some of the mixin traits from `DataSourceReader` to `DataReaderFactory`, so that they can be applied per split. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21142: [SPARK-24069][R] Add array_min / array_max functions
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21142 Merged to master. Thanks for reviewing this @felixcheung. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21056: [SPARK-23849][SQL] Tests for samplingRatio of json datas...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21056 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21141: [SPARK-23853][PYSPARK][TEST] Run Hive-related PyS...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21141#discussion_r184247014 --- Diff: python/pyspark/sql/tests.py --- @@ -3021,6 +3021,17 @@ def test_sort_with_nulls_order(self): class HiveSparkSubmitTests(SparkSubmitTests): +@classmethod +def setUpClass(cls): +import glob +from pyspark.find_spark_home import _find_spark_home + +SPARK_HOME = _find_spark_home() +filename_pattern = ("sql/hive/target/spark-hive_*-sources.jar") +if not glob.glob(os.path.join(SPARK_HOME, filename_pattern)): +raise unittest.SkipTest( --- End diff -- Yea, actually, I think it would be nicer in `setUp` for a better(?) message .. I replaced `unittest.SkipTest` to `self.skipTest` per https://docs.python.org/2/library/unittest.html#unittest.SkipTest and https://docs.python.org/2/library/unittest.html#unittest.TestCase.skipTest --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21155: SPARK-23927: Add "sequence" expression
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21155 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21056: [SPARK-23849][SQL] Tests for samplingRatio of json datas...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21056 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21056: [SPARK-23849][SQL] Tests for samplingRatio of json datas...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21056 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89861/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21155: SPARK-23927: Add "sequence" expression
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21155 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/89860/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21056: [SPARK-23849][SQL] Tests for samplingRatio of json datas...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21056 **[Test build #89861 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89861/testReport)** for PR 21056 at commit [`2bd14e2`](https://github.com/apache/spark/commit/2bd14e26adbf8d7e306b1f8915e059c3e8f01fb3). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21155: SPARK-23927: Add "sequence" expression
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21155 **[Test build #89860 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89860/testReport)** for PR 21155 at commit [`a7c0ccd`](https://github.com/apache/spark/commit/a7c0ccdfa7b2f78ab3ae29aa6b5b1c16dfc38f9c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21138: [SPARK-24062][Thrift Server] Fix SASL encryption cannot ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21138 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21138: [SPARK-24062][Thrift Server] Fix SASL encryption cannot ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21138 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2683/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21138: [SPARK-24062][Thrift Server] Fix SASL encryption cannot ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21138 **[Test build #89864 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89864/testReport)** for PR 21138 at commit [`0077685`](https://github.com/apache/spark/commit/00776858c2e776f46dbe542effe52a19283e752f). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21138: [SPARK-24062][Thrift Server] Fix SASL encryption cannot ...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/21138 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21138: [SPARK-24062][Thrift Server] Fix SASL encryption cannot ...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/21138 Thanks for the review @mridulm @vanzin . Let me test again. I will merge the code when test is passed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21143: [SPARK-24072][SQL] clearly define pushed filters
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21143 Thanks for working on this, @cloud-fan! I was thinking about needing it just recently so that data sources can delegate to Spark when needed. I'll have a thorough look at it tomorrow, but one quick high-level question: should we make these residuals based on the input split instead? Input splits might have different residual filters that need to be applied. For example, if you have a time range query, `ts > X`, and are storing data by day, then you know that when `day(ts) > day(X)` that `ts > X` *must* be true, but when `day(ts) = day(X)`, `ts > X` *might* be true. So for only some splits, when scanning the boundary day, you need to run the original filter, but not for any other splits. Another use case for a per-split residual is when splits might be different file formats. Parquet allows pushing down filters, but Avro doesn't. In a mixed table format it would be great for Avro splits to return the entire expression as a residual, while Parquet splits do the filtering. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21063: [SPARK-23886][Structured Streaming] Update query ...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21063#discussion_r184241625 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala --- @@ -111,7 +112,12 @@ trait ProgressReporter extends Logging { logDebug("Starting Trigger Calculation") lastTriggerStartTimestamp = currentTriggerStartTimestamp currentTriggerStartTimestamp = triggerClock.getTimeMillis() -currentStatus = currentStatus.copy(isTriggerActive = true) +// isTriggerActive field is kept false for ContinuousExecution +// since it is tied to MicroBatchExecution +this match { --- End diff -- nit: someone may have a concern that a trait needs to be aware of actual implementation. There looks like two options: 1. extract method to only update currentStatus for starting trigger defaulting to `isTriggerActive = true`, and let `ContinuousExecution` overrides the method. 2. just override `startTrigger()` in `ContinuousExecution`, and call `super.startTrigger()` and update currentStatus once again. It might open very small window for other threads to read invalid status information (isTriggerActive = true), but will require less change if it is acceptable. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user henryr commented on the issue: https://github.com/apache/spark/pull/21070 yes, thanks @maropu! +1 to the idea of making this a jenkins job. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/21070 Thank you @maropu! What resources does the run require? Is it something we could create a Jenkins job to run? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21141: [SPARK-23853][PYSPARK][TEST] Run Hive-related PyS...
Github user bersprockets commented on a diff in the pull request: https://github.com/apache/spark/pull/21141#discussion_r184241270 --- Diff: python/pyspark/sql/tests.py --- @@ -3021,6 +3021,17 @@ def test_sort_with_nulls_order(self): class HiveSparkSubmitTests(SparkSubmitTests): +@classmethod +def setUpClass(cls): +import glob +from pyspark.find_spark_home import _find_spark_home + +SPARK_HOME = _find_spark_home() +filename_pattern = ("sql/hive/target/spark-hive_*-sources.jar") +if not glob.glob(os.path.join(SPARK_HOME, filename_pattern)): +raise unittest.SkipTest( --- End diff -- Will this result in the right kind of message, particularly the kind that @HyukjinKwon is checking for in PR #21107? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21070: [SPARK-23972][BUILD][SQL] Update Parquet to 1.10.0.
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21070 @gatorsmile @rdblue I checked the numbers and I found no regression at least in TPC-DS: to check the actual numbers, see https://issues.apache.org/jira/browse/SPARK-24070. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21152: [SPARK-23688][SS] Refactor tests away from rate s...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21152#discussion_r184236837 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala --- @@ -66,157 +66,115 @@ class ContinuousSuite extends ContinuousSuiteBase { val input = ContinuousMemoryStream[Int] testStream(input.toDF())( - AddData(input, 0, 1, 2), - CheckAnswer(0, 1, 2), + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2): _*), StopStream, - AddData(input, 3, 4, 5), + AddData(input, 3.to(5): _*), StartStream(), - CheckAnswer(0, 1, 2, 3, 4, 5)) + CheckAnswer(0.to(5): _*)) } test("map") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .map(r => r.getLong(0) * 2) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().map(_.getInt(0) * 2) -testStream(df, useV2Sink = true)( - StartStream(longContinuousTrigger), - AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 2)), - IncrementEpoch(), - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - CheckAnswerRowsContains(scala.Range(0, 40, 2).map(Row(_ +testStream(df)( + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2).map(_ * 2): _*), + StopStream, + AddData(input, 3.to(5): _*), + StartStream(), + CheckAnswer(0.to(5).map(_ * 2): _*)) } test("flatMap") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .flatMap(r => Seq(0, r.getLong(0), r.getLong(0) * 2)) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().flatMap(r => Seq(0, r.getInt(0), r.getInt(0) * 2)) -testStream(df, useV2Sink = true)( - StartStream(longContinuousTrigger), - AwaitEpoch(0), - Execute(waitForRateSourceTriggers(_, 2)), - IncrementEpoch(), - Execute(waitForRateSourceTriggers(_, 4)), - IncrementEpoch(), - CheckAnswerRowsContains(scala.Range(0, 20).flatMap(n => Seq(0, n, n * 2)).map(Row(_ +testStream(df)( + AddData(input, 0.to(2): _*), + CheckAnswer(0.to(2).flatMap(n => Seq(0, n, n * 2)): _*), + StopStream, + AddData(input, 3.to(5): _*), + StartStream(), + CheckAnswer(0.to(5).flatMap(n => Seq(0, n, n * 2)): _*)) } test("filter") { -val df = spark.readStream - .format("rate") - .option("numPartitions", "5") - .option("rowsPerSecond", "5") - .load() - .select('value) - .where('value > 5) +val input = ContinuousMemoryStream[Int] +val df = input.toDF().where('value > 5) --- End diff -- @jose-torres What do you think about this? Would it be better to have tests for untyped and typed? Code duplication is not that huge since I guess logic for verification can be reused for every test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21145: [SPARK-24073][SQL]: Rename DataReaderFactory to R...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/21145#discussion_r184235329 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -299,13 +299,13 @@ private[kafka010] class KafkaMicroBatchReader( } } -/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */ +/** A [[ReadTask]] for reading Kafka data in a micro-batch streaming query. */ private[kafka010] case class KafkaMicroBatchDataReaderFactory( --- End diff -- This fixes the API, not implementations, and it already touches 30+ files. I'd rather not fix the downstream classes for two reasons. First, to avoid this becoming really large. Second, we need to be able to evolve these APIs without requiring changes to all implementations. This is still evolving and if we need to update 20+ implementations to make simple changes, then I think that's a problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21145: [SPARK-24073][SQL]: Rename DataReaderFactory to R...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21145#discussion_r184232476 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala --- @@ -299,13 +299,13 @@ private[kafka010] class KafkaMicroBatchReader( } } -/** A [[DataReaderFactory]] for reading Kafka data in a micro-batch streaming query. */ +/** A [[ReadTask]] for reading Kafka data in a micro-batch streaming query. */ private[kafka010] case class KafkaMicroBatchDataReaderFactory( --- End diff -- @rdblue . The re-naming of this PR seems to be partial. Could you replace `KafkaMicroBatchDataReaderFactory` with `KafkaMicroBatchReadTask` together in this PR? I guess there will be more occurrences like this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21088: [SPARK-24003][CORE] Add support to provide spark.executo...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/21088 It would be nice to add this to YARN's AM too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21118 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/2682/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21118 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21118: SPARK-23325: Use InternalRow when reading with DataSourc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21118 **[Test build #89863 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/89863/testReport)** for PR 21118 at commit [`16f1b6e`](https://github.com/apache/spark/commit/16f1b6e7fd8b658feabf25d7c1a354390dcd8eaa). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21122: [SPARK-24017] [SQL] Refactor ExternalCatalog to b...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21122#discussion_r184228752 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalog.scala --- @@ -31,10 +30,16 @@ import org.apache.spark.util.ListenerBus * * Implementations should throw [[NoSuchDatabaseException]] when databases don't exist. */ -abstract class ExternalCatalog - extends ListenerBus[ExternalCatalogEventListener, ExternalCatalogEvent] { +trait ExternalCatalog { --- End diff -- I see. Thank you for confirming, @gatorsmile . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org