Github user wangyum commented on a diff in the pull request:
https://github.com/apache/spark/pull/20020#discussion_r212850054
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/command/DataWritingCommand.scala
---
@@ -20,30 +20,32 @@ package org.apache.spark.sql.execution.command
import org.apache.hadoop.conf.Configuration
import org.apache.spark.SparkContext
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
+import org.apache.spark.sql.execution.datasources.FileFormatWriter
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.util.SerializableConfiguration
-
/**
- * A special `RunnableCommand` which writes data out and updates metrics.
+ * A special `Command` which writes data out and updates metrics.
*/
-trait DataWritingCommand extends RunnableCommand {
-
+trait DataWritingCommand extends Command {
/**
* The input query plan that produces the data to be written.
+ * IMPORTANT: the input query plan MUST be analyzed, so that we can
carry its output columns
+ * to [[FileFormatWriter]].
*/
def query: LogicalPlan
- // We make the input `query` an inner child instead of a child in order
to hide it from the
- // optimizer. This is because optimizer may not preserve the output
schema names' case, and we
- // have to keep the original analyzed plan here so that we can pass the
corrected schema to the
- // writer. The schema of analyzed plan is what user expects(or
specifies), so we should respect
- // it when writing.
- override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
+ override final def children: Seq[LogicalPlan] = query :: Nil
- override lazy val metrics: Map[String, SQLMetric] = {
+ // Output columns of the analyzed input query plan
+ def outputColumns: Seq[Attribute]
--- End diff --
`outputColumns` changed from analyzed to optimized. For example:
```scala
withTempDir { dir =>
val path = dir.getCanonicalPath
val cnt = 30
val table1Path = s"$path/table1"
val table3Path = s"$path/table3"
spark.range(cnt).selectExpr("cast(id as bigint) as col1", "cast(id % 3 as
bigint) as col2")
.write.mode(SaveMode.Overwrite).parquet(table1Path)
withTable("table1", "table3") {
spark.sql(
s"CREATE TABLE table1(col1 bigint, col2 bigint) using parquet
location '$table1Path/'")
spark.sql("CREATE TABLE table3(COL1 bigint, COL2 bigint) using parquet
" +
"PARTITIONED BY (COL2) " +
s"CLUSTERED BY (COL1) INTO 2 BUCKETS location '$table3Path/'")
withView("view1") {
spark.sql("CREATE VIEW view1 as select col1, col2 from table1 where
col1 > -20")
spark.sql("INSERT OVERWRITE TABLE table3 select COL1, COL2 from view1
CLUSTER BY COL1")
spark.table("table3").show
}
}
}
```
```
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(COL1#19L, COL2#20L)
outputColumns: List(col1#16L, col2#17L)
outputColumns: List(col1#16L, col2#17L)
outputColumns: List(col1#16L, col2#17L)
```
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]