This is an automated email from the ASF dual-hosted git repository. yao pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 8cd3c1a9c1c [SPARK-45568][TESTS] Fix flaky WholeStageCodegenSparkSubmitSuite 8cd3c1a9c1c is described below commit 8cd3c1a9c1c336155fe09728171aba84ef55ef2d Author: Kent Yao <y...@apache.org> AuthorDate: Tue Oct 17 22:19:18 2023 +0800 [SPARK-45568][TESTS] Fix flaky WholeStageCodegenSparkSubmitSuite ### What changes were proposed in this pull request? WholeStageCodegenSparkSubmitSuite is [flaky](https://github.com/apache/spark/actions/runs/6479534195/job/17593342589) because SHUFFLE_PARTITIONS(200) creates 200 reducers for one total core and improper stop progress causes executor launcher reties. The heavy load and reties might result in timeout test failures. ### Why are the changes needed? CI robustness ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing WholeStageCodegenSparkSubmitSuite ### Was this patch authored or co-authored using generative AI tooling? no Closes #43394 from yaooqinn/SPARK-45568. Authored-by: Kent Yao <y...@apache.org> Signed-off-by: Kent Yao <y...@apache.org> (cherry picked from commit f00ec39542a5f9ac75d8c24f0f04a7be703c8d7c) Signed-off-by: Kent Yao <y...@apache.org> --- .../WholeStageCodegenSparkSubmitSuite.scala | 57 ++++++++++++---------- 1 file changed, 30 insertions(+), 27 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala index 73c4e4c3e1e..06ba8fb772a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSparkSubmitSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.deploy.SparkSubmitTestUtils import org.apache.spark.internal.Logging import org.apache.spark.sql.{QueryTest, Row, SparkSession} import org.apache.spark.sql.functions.{array, col, count, lit} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.IntegerType import org.apache.spark.unsafe.Platform import org.apache.spark.util.ResetSystemProperties @@ -68,39 +69,41 @@ class WholeStageCodegenSparkSubmitSuite extends SparkSubmitTestUtils object WholeStageCodegenSparkSubmitSuite extends Assertions with Logging { - var spark: SparkSession = _ - def main(args: Array[String]): Unit = { TestUtils.configTestLog4j2("INFO") - spark = SparkSession.builder().getOrCreate() + val spark = SparkSession.builder() + .config(SQLConf.SHUFFLE_PARTITIONS.key, "2") + .getOrCreate() + + try { + // Make sure the test is run where the driver and the executors uses different object layouts + val driverArrayHeaderSize = Platform.BYTE_ARRAY_OFFSET + val executorArrayHeaderSize = + spark.sparkContext.range(0, 1).map(_ => Platform.BYTE_ARRAY_OFFSET).collect().head + assert(driverArrayHeaderSize > executorArrayHeaderSize) - // Make sure the test is run where the driver and the executors uses different object layouts - val driverArrayHeaderSize = Platform.BYTE_ARRAY_OFFSET - val executorArrayHeaderSize = - spark.sparkContext.range(0, 1).map(_ => Platform.BYTE_ARRAY_OFFSET).collect.head.toInt - assert(driverArrayHeaderSize > executorArrayHeaderSize) + val df = spark.range(71773).select((col("id") % lit(10)).cast(IntegerType) as "v") + .groupBy(array(col("v"))).agg(count(col("*"))) + val plan = df.queryExecution.executedPlan + assert(plan.exists(_.isInstanceOf[WholeStageCodegenExec])) - val df = spark.range(71773).select((col("id") % lit(10)).cast(IntegerType) as "v") - .groupBy(array(col("v"))).agg(count(col("*"))) - val plan = df.queryExecution.executedPlan - assert(plan.exists(_.isInstanceOf[WholeStageCodegenExec])) + val expectedAnswer = + Row(Array(0), 7178) :: + Row(Array(1), 7178) :: + Row(Array(2), 7178) :: + Row(Array(3), 7177) :: + Row(Array(4), 7177) :: + Row(Array(5), 7177) :: + Row(Array(6), 7177) :: + Row(Array(7), 7177) :: + Row(Array(8), 7177) :: + Row(Array(9), 7177) :: Nil - val expectedAnswer = - Row(Array(0), 7178) :: - Row(Array(1), 7178) :: - Row(Array(2), 7178) :: - Row(Array(3), 7177) :: - Row(Array(4), 7177) :: - Row(Array(5), 7177) :: - Row(Array(6), 7177) :: - Row(Array(7), 7177) :: - Row(Array(8), 7177) :: - Row(Array(9), 7177) :: Nil - val result = df.collect - QueryTest.sameRows(result.toSeq, expectedAnswer) match { - case Some(errMsg) => fail(errMsg) - case _ => + QueryTest.checkAnswer(df, expectedAnswer) + } finally { + spark.stop() } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org