dtenedor commented on code in PR #36077:
URL: https://github.com/apache/spark/pull/36077#discussion_r845408295


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -239,12 +268,33 @@ case class ResolveDefaultColumns(
       val lookup = catalog.lookupRelation(tableName)
       lookup match {
         case SubqueryAlias(_, r: UnresolvedCatalogRelation) =>
-          Some(StructType(r.tableMeta.schema.fields.dropRight(
-            enclosingInsert.get.partitionSpec.size)))
-        case _ => None
+          StructType(r.tableMeta.schema.fields.dropRight(
+            enclosingInsert.get.partitionSpec.size))
+        case _ => return None
       }
     } catch {
-      case _: NoSuchTableException => None
+      case _: AnalysisException => return None
+    }
+    // Rearrange the columns in the result schema to match the order of the 
explicit column list,
+    // if any.
+    val userSpecifiedCols: Seq[String] = enclosingInsert.get.userSpecifiedCols
+    if (userSpecifiedCols.isEmpty) {
+      return Some(schema)
     }
+    val colNamesToFields: Map[String, StructField] =
+      schema.fields.map {
+        field: StructField => field.name -> field
+      }.toMap
+    val userSpecifiedFields: Seq[StructField] =
+      userSpecifiedCols.map {
+        name: String => colNamesToFields.getOrElse(name, return None)

Review Comment:
   Sounds good. I added this TreeNodeTag, and added test cases for 
case-insensitive behavior (with and without the `spark.sql.caseSensitive` 
config enabled).
   
   Note, this `ResolveDefaultColumns` rule currently depends on the whole 
`InsertIntoStatement` not being analyzed yet. So now it checks that the new 
TreeNodeTag is unset.



##########
sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala:
##########
@@ -1105,6 +1105,97 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
     }
   }
 
+  test("INSERT INTO with user specified columns and defaults: positive tests") 
{
+    withTable("t") {
+      sql("create table t(i boolean default true, s bigint default 42) using 
parquet")
+      sql("insert into t (i, s) values (true, default)")
+      checkAnswer(sql("select s from t where i = true"), Seq(42L).map(i => 
Row(i)))
+    }
+    withTable("t") {
+      sql("create table t(i boolean default true, s bigint default 42) using 
parquet")
+      sql("insert into t (s, i) values (default, true)")
+      checkAnswer(sql("select s from t where i = true"), Seq(42L).map(i => 
Row(i)))
+    }
+    withTable("t") {
+      sql("create table t(i boolean default true, s bigint default 42) using 
parquet")
+      sql("insert into t (i) values (true)")
+      checkAnswer(sql("select s from t where i = true"), Seq(42L).map(i => 
Row(i)))
+    }
+    withTable("t") {
+      sql("create table t(i boolean default true, s bigint default 42) using 
parquet")
+      sql("insert into t (i) values (default)")
+      checkAnswer(sql("select s from t where i = true"), Seq(42L).map(i => 
Row(i)))
+    }
+    withTable("t") {
+      sql("create table t(i boolean default true, s bigint default 42) using 
parquet")
+      sql("insert into t (s) values (default)")
+      checkAnswer(sql("select s from t where i = true"), Seq(42L).map(i => 
Row(i)))
+    }
+    withTable("t") {
+      sql("create table t(i boolean default true, s bigint default 42) using 
parquet")
+      sql("insert into t (s) select default from (select 1)")
+      checkAnswer(sql("select s from t where i = true"), Seq(42L).map(i => 
Row(i)))
+    }
+    withTable("t") {
+      sql("create table t(i boolean default true, s bigint default 42) using 
parquet")
+      sql("insert into t (i) select true from (select 1)")
+      checkAnswer(sql("select s from t where i = true"), Seq(42L).map(i => 
Row(i)))
+    }
+    withTable("t") {
+      sql("create table t(i boolean, s bigint default 42, q int default 43) 
using parquet")
+      sql("insert into t (i, q) select true from (select 1)")
+      checkAnswer(sql("select s from t where q = 43"), Seq(42L).map(i => 
Row(i)))
+    }
+    // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is 
enabled, and no
+    // explicit DEFAULT value is available when the INSERT INTO statement 
provides fewer
+    // values than expected, NULL values are appended in their place.
+    withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> 
"true") {
+      withTable("t") {
+        sql("create table t(i boolean, s bigint) using parquet")
+        sql("insert into t (i) values (true)")
+        checkAnswer(sql("select s from t where i = true"), Seq(null).map(i => 
Row(i)))
+      }
+      withTable("t") {
+        sql("create table t(i boolean default true, s bigint) using parquet")
+        sql("insert into t (i) values (default)")
+        checkAnswer(sql("select s from t where i = true"), Seq(null).map(i => 
Row(i)))
+      }
+      withTable("t") {
+        sql("create table t(i boolean, s bigint default 42) using parquet")
+        sql("insert into t (s) values (default)")
+        checkAnswer(sql("select s from t where i is null"), Seq(42L).map(i => 
Row(i)))
+      }
+    }
+  }
+
+  test("INSERT INTO with user specified columns and defaults: negative tests") 
{

Review Comment:
   Good idea, done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -53,25 +53,49 @@ case class ResolveDefaultColumns(
 
   // This field stores the enclosing INSERT INTO command, once we find one.
   var enclosingInsert: Option[InsertIntoStatement] = None
+  // This field stores the schema of the target table of the above command.
+  var insertTableSchemaWithoutPartitionColumns: Option[StructType] = None
 
-  override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
-    (_ => SQLConf.get.enableDefaultColumns), ruleId) {
-    case i@InsertIntoStatement(_, _, _, _, _, _)
-      if i.query.collectFirst { case u: UnresolvedInlineTable => u }.isDefined 
=>
-      enclosingInsert = Some(i)
-      i
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    // Initialize by clearing our reference to the enclosing INSERT INTO 
command.
+    enclosingInsert = None
+    insertTableSchemaWithoutPartitionColumns = None
+    // Traverse the logical query plan in preorder (top-down).
+    plan.resolveOperatorsWithPruning(
+      (_ => SQLConf.get.enableDefaultColumns), ruleId) {
+      case i@InsertIntoStatement(_, _, _, _, _, _)
+        if i.query.collectFirst { case u: UnresolvedInlineTable => u 
}.isDefined =>
+        enclosingInsert = Some(i)
+        insertTableSchemaWithoutPartitionColumns = 
getInsertTableSchemaWithoutPartitionColumns
+        regenerateUserSpecifiedCols(i)
+
+      case table: UnresolvedInlineTable
+        if enclosingInsert.isDefined &&
+          table.rows.nonEmpty && table.rows.forall(_.size == 
table.rows(0).size) =>
+        val expanded: UnresolvedInlineTable = 
addMissingDefaultColumnValues(table).getOrElse(table)
+        replaceExplicitDefaultColumnValues(analyzer, expanded).getOrElse(table)
+
+      case i@InsertIntoStatement(_, _, _, project: Project, _, _) =>
+        enclosingInsert = Some(i)
+        insertTableSchemaWithoutPartitionColumns = 
getInsertTableSchemaWithoutPartitionColumns
+        val expanded: Project = 
addMissingDefaultColumnValues(project).getOrElse(project)
+        val replaced: Option[LogicalPlan] = 
replaceExplicitDefaultColumnValues(analyzer, expanded)
+        val updated: InsertIntoStatement =
+          if (replaced.isDefined) i.copy(query = replaced.get) else i
+        regenerateUserSpecifiedCols(updated)

Review Comment:
   Apologies for confusion: the `regenerateUserSpecifiedCols` method actually 
returns an updated `InsertIntoStatement`. I updated the code to explicitly 
assign it to a variable and return that instead. Hopefully this is more 
straightforward now.



##########
sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala:
##########
@@ -1105,6 +1105,97 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
     }
   }
 
+  test("INSERT INTO with user specified columns and defaults: positive tests") 
{
+    withTable("t") {
+      sql("create table t(i boolean default true, s bigint default 42) using 
parquet")
+      sql("insert into t (i, s) values (true, default)")
+      checkAnswer(sql("select s from t where i = true"), Seq(42L).map(i => 
Row(i)))

Review Comment:
   Thanks, this is better! I updated the existing positive tests for INSERT 
INTO commands (without user-specified columns) to verify the results in this 
way as well.



##########
sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala:
##########
@@ -1105,6 +1105,97 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
     }
   }
 
+  test("INSERT INTO with user specified columns and defaults: positive tests") 
{
+    withTable("t") {
+      sql("create table t(i boolean default true, s bigint default 42) using 
parquet")

Review Comment:
   Good point! I refactored this into a Seq of INSERT INTO statements since the 
CREATE TABLE and SELECT parts are all the same for all these positive cases!



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -239,12 +268,33 @@ case class ResolveDefaultColumns(
       val lookup = catalog.lookupRelation(tableName)
       lookup match {
         case SubqueryAlias(_, r: UnresolvedCatalogRelation) =>
-          Some(StructType(r.tableMeta.schema.fields.dropRight(
-            enclosingInsert.get.partitionSpec.size)))
-        case _ => None
+          StructType(r.tableMeta.schema.fields.dropRight(
+            enclosingInsert.get.partitionSpec.size))
+        case _ => return None
       }
     } catch {
-      case _: NoSuchTableException => None
+      case _: AnalysisException => return None

Review Comment:
   Good Q. I found that the above `lookupRelation` call can return other "not 
found" type of exceptions, e.g. CatalogNotFoundException. In those cases the 
analyzer would still return a reasonable looking error message, but it might 
look different depending on whether this rule executed or not. I felt like it 
was simpler to decouple this rule from the table lookup failure entirely and 
let the previous code handle the error case. I added a comment here explaining 
this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to