dtenedor commented on code in PR #36077:
URL: https://github.com/apache/spark/pull/36077#discussion_r845408295
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -239,12 +268,33 @@ case class ResolveDefaultColumns(
val lookup = catalog.lookupRelation(tableName)
lookup match {
case SubqueryAlias(_, r: UnresolvedCatalogRelation) =>
- Some(StructType(r.tableMeta.schema.fields.dropRight(
- enclosingInsert.get.partitionSpec.size)))
- case _ => None
+ StructType(r.tableMeta.schema.fields.dropRight(
+ enclosingInsert.get.partitionSpec.size))
+ case _ => return None
}
} catch {
- case _: NoSuchTableException => None
+ case _: AnalysisException => return None
+ }
+ // Rearrange the columns in the result schema to match the order of the
explicit column list,
+ // if any.
+ val userSpecifiedCols: Seq[String] = enclosingInsert.get.userSpecifiedCols
+ if (userSpecifiedCols.isEmpty) {
+ return Some(schema)
}
+ val colNamesToFields: Map[String, StructField] =
+ schema.fields.map {
+ field: StructField => field.name -> field
+ }.toMap
+ val userSpecifiedFields: Seq[StructField] =
+ userSpecifiedCols.map {
+ name: String => colNamesToFields.getOrElse(name, return None)
Review Comment:
Sounds good. I added this TreeNodeTag, and added test cases for
case-insensitive behavior (with and without the `spark.sql.caseSensitive`
config enabled).
Note, this `ResolveDefaultColumns` rule currently depends on the whole
`InsertIntoStatement` not being analyzed yet. So now it checks that the new
TreeNodeTag is unset.
##########
sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala:
##########
@@ -1105,6 +1105,97 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
}
}
+ test("INSERT INTO with user specified columns and defaults: positive tests")
{
+ withTable("t") {
+ sql("create table t(i boolean default true, s bigint default 42) using
parquet")
+ sql("insert into t (i, s) values (true, default)")
+ checkAnswer(sql("select s from t where i = true"), Seq(42L).map(i =>
Row(i)))
+ }
+ withTable("t") {
+ sql("create table t(i boolean default true, s bigint default 42) using
parquet")
+ sql("insert into t (s, i) values (default, true)")
+ checkAnswer(sql("select s from t where i = true"), Seq(42L).map(i =>
Row(i)))
+ }
+ withTable("t") {
+ sql("create table t(i boolean default true, s bigint default 42) using
parquet")
+ sql("insert into t (i) values (true)")
+ checkAnswer(sql("select s from t where i = true"), Seq(42L).map(i =>
Row(i)))
+ }
+ withTable("t") {
+ sql("create table t(i boolean default true, s bigint default 42) using
parquet")
+ sql("insert into t (i) values (default)")
+ checkAnswer(sql("select s from t where i = true"), Seq(42L).map(i =>
Row(i)))
+ }
+ withTable("t") {
+ sql("create table t(i boolean default true, s bigint default 42) using
parquet")
+ sql("insert into t (s) values (default)")
+ checkAnswer(sql("select s from t where i = true"), Seq(42L).map(i =>
Row(i)))
+ }
+ withTable("t") {
+ sql("create table t(i boolean default true, s bigint default 42) using
parquet")
+ sql("insert into t (s) select default from (select 1)")
+ checkAnswer(sql("select s from t where i = true"), Seq(42L).map(i =>
Row(i)))
+ }
+ withTable("t") {
+ sql("create table t(i boolean default true, s bigint default 42) using
parquet")
+ sql("insert into t (i) select true from (select 1)")
+ checkAnswer(sql("select s from t where i = true"), Seq(42L).map(i =>
Row(i)))
+ }
+ withTable("t") {
+ sql("create table t(i boolean, s bigint default 42, q int default 43)
using parquet")
+ sql("insert into t (i, q) select true from (select 1)")
+ checkAnswer(sql("select s from t where q = 43"), Seq(42L).map(i =>
Row(i)))
+ }
+ // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is
enabled, and no
+ // explicit DEFAULT value is available when the INSERT INTO statement
provides fewer
+ // values than expected, NULL values are appended in their place.
+ withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key ->
"true") {
+ withTable("t") {
+ sql("create table t(i boolean, s bigint) using parquet")
+ sql("insert into t (i) values (true)")
+ checkAnswer(sql("select s from t where i = true"), Seq(null).map(i =>
Row(i)))
+ }
+ withTable("t") {
+ sql("create table t(i boolean default true, s bigint) using parquet")
+ sql("insert into t (i) values (default)")
+ checkAnswer(sql("select s from t where i = true"), Seq(null).map(i =>
Row(i)))
+ }
+ withTable("t") {
+ sql("create table t(i boolean, s bigint default 42) using parquet")
+ sql("insert into t (s) values (default)")
+ checkAnswer(sql("select s from t where i is null"), Seq(42L).map(i =>
Row(i)))
+ }
+ }
+ }
+
+ test("INSERT INTO with user specified columns and defaults: negative tests")
{
Review Comment:
Good idea, done.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -53,25 +53,49 @@ case class ResolveDefaultColumns(
// This field stores the enclosing INSERT INTO command, once we find one.
var enclosingInsert: Option[InsertIntoStatement] = None
+ // This field stores the schema of the target table of the above command.
+ var insertTableSchemaWithoutPartitionColumns: Option[StructType] = None
- override def apply(plan: LogicalPlan): LogicalPlan =
plan.resolveOperatorsWithPruning(
- (_ => SQLConf.get.enableDefaultColumns), ruleId) {
- case i@InsertIntoStatement(_, _, _, _, _, _)
- if i.query.collectFirst { case u: UnresolvedInlineTable => u }.isDefined
=>
- enclosingInsert = Some(i)
- i
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ // Initialize by clearing our reference to the enclosing INSERT INTO
command.
+ enclosingInsert = None
+ insertTableSchemaWithoutPartitionColumns = None
+ // Traverse the logical query plan in preorder (top-down).
+ plan.resolveOperatorsWithPruning(
+ (_ => SQLConf.get.enableDefaultColumns), ruleId) {
+ case i@InsertIntoStatement(_, _, _, _, _, _)
+ if i.query.collectFirst { case u: UnresolvedInlineTable => u
}.isDefined =>
+ enclosingInsert = Some(i)
+ insertTableSchemaWithoutPartitionColumns =
getInsertTableSchemaWithoutPartitionColumns
+ regenerateUserSpecifiedCols(i)
+
+ case table: UnresolvedInlineTable
+ if enclosingInsert.isDefined &&
+ table.rows.nonEmpty && table.rows.forall(_.size ==
table.rows(0).size) =>
+ val expanded: UnresolvedInlineTable =
addMissingDefaultColumnValues(table).getOrElse(table)
+ replaceExplicitDefaultColumnValues(analyzer, expanded).getOrElse(table)
+
+ case i@InsertIntoStatement(_, _, _, project: Project, _, _) =>
+ enclosingInsert = Some(i)
+ insertTableSchemaWithoutPartitionColumns =
getInsertTableSchemaWithoutPartitionColumns
+ val expanded: Project =
addMissingDefaultColumnValues(project).getOrElse(project)
+ val replaced: Option[LogicalPlan] =
replaceExplicitDefaultColumnValues(analyzer, expanded)
+ val updated: InsertIntoStatement =
+ if (replaced.isDefined) i.copy(query = replaced.get) else i
+ regenerateUserSpecifiedCols(updated)
Review Comment:
Apologies for confusion: the `regenerateUserSpecifiedCols` method actually
returns an updated `InsertIntoStatement`. I updated the code to explicitly
assign it to a variable and return that instead. Hopefully this is more
straightforward now.
##########
sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala:
##########
@@ -1105,6 +1105,97 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
}
}
+ test("INSERT INTO with user specified columns and defaults: positive tests")
{
+ withTable("t") {
+ sql("create table t(i boolean default true, s bigint default 42) using
parquet")
+ sql("insert into t (i, s) values (true, default)")
+ checkAnswer(sql("select s from t where i = true"), Seq(42L).map(i =>
Row(i)))
Review Comment:
Thanks, this is better! I updated the existing positive tests for INSERT
INTO commands (without user-specified columns) to verify the results in this
way as well.
##########
sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala:
##########
@@ -1105,6 +1105,97 @@ class InsertSuite extends DataSourceTest with
SharedSparkSession {
}
}
+ test("INSERT INTO with user specified columns and defaults: positive tests")
{
+ withTable("t") {
+ sql("create table t(i boolean default true, s bigint default 42) using
parquet")
Review Comment:
Good point! I refactored this into a Seq of INSERT INTO statements since the
CREATE TABLE and SELECT parts are all the same for all these positive cases!
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala:
##########
@@ -239,12 +268,33 @@ case class ResolveDefaultColumns(
val lookup = catalog.lookupRelation(tableName)
lookup match {
case SubqueryAlias(_, r: UnresolvedCatalogRelation) =>
- Some(StructType(r.tableMeta.schema.fields.dropRight(
- enclosingInsert.get.partitionSpec.size)))
- case _ => None
+ StructType(r.tableMeta.schema.fields.dropRight(
+ enclosingInsert.get.partitionSpec.size))
+ case _ => return None
}
} catch {
- case _: NoSuchTableException => None
+ case _: AnalysisException => return None
Review Comment:
Good Q. I found that the above `lookupRelation` call can return other "not
found" type of exceptions, e.g. CatalogNotFoundException. In those cases the
analyzer would still return a reasonable looking error message, but it might
look different depending on whether this rule executed or not. I felt like it
was simpler to decouple this rule from the table lookup failure entirely and
let the previous code handle the error case. I added a comment here explaining
this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]