beliefer commented on code in PR #41839:
URL: https://github.com/apache/spark/pull/41839#discussion_r1251471826
##########
sql/core/src/test/scala/org/apache/spark/sql/SparkSessionExtensionSuite.scala:
##########
@@ -278,37 +278,42 @@ class SparkSessionExtensionSuite extends SparkFunSuite
with SQLHelper {
MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule()))
}
withSession(extensions) { session =>
- session.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED, enableAQE)
- assert(session.sessionState.columnarRules.contains(
- MyColumnarRule(PreRuleReplaceAddWithBrokenVersion(), MyPostRule())))
- import session.sqlContext.implicits._
- // perform a join to inject a broadcast exchange
- val left = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("l1", "l2")
- val right = Seq((1, 50L), (2, 100L), (3, 150L)).toDF("r1", "r2")
- val data = left.join(right, $"l1" === $"r1")
- // repartitioning avoids having the add operation pushed up into the
LocalTableScan
- .repartition(1)
- val df = data.selectExpr("l2 + r2")
- // execute the plan so that the final adaptive plan is available when
AQE is on
- df.collect()
- val found = collectPlanSteps(df.queryExecution.executedPlan).sum
- // 1 MyBroadcastExchangeExec
- // 1 MyShuffleExchangeExec
- // 1 ColumnarToRowExec
- // 2 ColumnarProjectExec
- // 1 ReplacedRowToColumnarExec
- // so 11121 is expected.
- assert(found == 11121)
-
- // Verify that we get back the expected, wrong, result
- val result = df.collect()
- assert(result(0).getLong(0) == 101L) // Check that broken columnar Add
was used.
- assert(result(1).getLong(0) == 201L)
- assert(result(2).getLong(0) == 301L)
-
- withTempPath { path =>
- val e = intercept[Exception](df.write.parquet(path.getCanonicalPath))
- assert(e.getMessage == "columnar write")
+ Seq(true, false).foreach { enableEvaluator =>
+ withSQLConf(
+ SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> enableAQE.toString,
Review Comment:
Shall we move this config to outside.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala:
##########
@@ -453,51 +454,25 @@ case class RowToColumnarExec(child: SparkPlan) extends
RowToColumnarTransition {
)
override def doExecuteColumnar(): RDD[ColumnarBatch] = {
- val enableOffHeapColumnVector = conf.offHeapColumnVectorEnabled
val numInputRows = longMetric("numInputRows")
val numOutputBatches = longMetric("numOutputBatches")
// Instead of creating a new config we are reusing columnBatchSize. In the
future if we do
// combine with some of the Arrow conversion tools we will need to unify
some of the configs.
val numRows = conf.columnBatchSize
- // This avoids calling `schema` in the RDD closure, so that we don't need
to include the entire
- // plan (this) in the closure.
Review Comment:
ditto
##########
sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlanSuite.scala:
##########
@@ -126,19 +126,22 @@ class SparkPlanSuite extends QueryTest with
SharedSparkSession {
}
test("SPARK-37779: ColumnarToRowExec should be canonicalizable after being
(de)serialized") {
- withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
- withTempPath { path =>
- spark.range(1).write.parquet(path.getAbsolutePath)
- val df = spark.read.parquet(path.getAbsolutePath)
- val columnarToRowExec =
- df.queryExecution.executedPlan.collectFirst { case p:
ColumnarToRowExec => p }.get
- try {
- spark.range(1).foreach { _ =>
- columnarToRowExec.canonicalized
- ()
+ Seq(true, false).foreach { enable =>
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet",
Review Comment:
ditto
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala:
##########
@@ -89,15 +87,18 @@ case class ColumnarToRowExec(child: SparkPlan) extends
ColumnarToRowTransition w
override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
val numInputBatches = longMetric("numInputBatches")
- // This avoids calling `output` in the RDD closure, so that we don't need
to include the entire
- // plan (this) in the closure.
Review Comment:
It seems we need move this comment into `ColumnarToRowEvaluatorFactory` too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]