This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 52a2e63dd714 [SPARK-47850][SQL] Support `spark.sql.hive.convertInsertingUnpartitionedTable` 52a2e63dd714 is described below commit 52a2e63dd7147e2701c9c26667fe5bd9fdc3f14c Author: Cheng Pan <cheng...@apache.org> AuthorDate: Thu Apr 18 15:05:15 2024 +0800 [SPARK-47850][SQL] Support `spark.sql.hive.convertInsertingUnpartitionedTable` ### What changes were proposed in this pull request? This PR introduced a new configuration `spark.sql.hive.convertInsertingUnpartitionedTable` alongside the existing `spark.sql.hive.convertInsertingPartitionedTable` to allow fine grain switching from Hive Serde to Data Source on inserting Parquet/ORC Hive tables. ### Why are the changes needed? In the practice of hybrid workload (Hive tables may be read/written by Hive, Spark, Impala, etc.), we usually use DataSource for reading Parquet/ORC tables but Hive Serde for writing, the current configuration combination allows us to achieve that except for unpartitioned tables. ### Does this PR introduce _any_ user-facing change? No. The new added configuration `spark.sql.hive.convertInsertingUnpartitionedTable` default value is `true`, which keeps the existing behavior. ### How was this patch tested? New UT is added. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #46052 from pan3793/SPARK-47850. Authored-by: Cheng Pan <cheng...@apache.org> Signed-off-by: yangjie01 <yangji...@baidu.com> --- .../plans/logical/basicLogicalOperators.scala | 1 + .../apache/spark/sql/execution/command/views.scala | 1 + .../org/apache/spark/sql/hive/HiveStrategies.scala | 10 ++++-- .../org/apache/spark/sql/hive/HiveUtils.scala | 10 ++++++ .../execution/CreateHiveTableAsSelectCommand.scala | 5 ++- .../sql/hive/execution/InsertIntoHiveTable.scala | 7 ++++ .../spark/sql/hive/orc/HiveOrcQuerySuite.scala | 37 ++++++++++++++++++++++ 7 files changed, 67 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 1c8f7a97dd7f..7c36e3bc79af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -783,6 +783,7 @@ object View { "spark.sql.hive.convertMetastoreParquet", "spark.sql.hive.convertMetastoreOrc", "spark.sql.hive.convertInsertingPartitionedTable", + "spark.sql.hive.convertInsertingUnpartitionedTable", "spark.sql.hive.convertMetastoreCtas" ).contains(key) || key.startsWith("spark.sql.catalog.") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index d71d0d43683c..cb5e7e7f42d2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -360,6 +360,7 @@ object ViewHelper extends SQLConfHelper with Logging { "spark.sql.hive.convertMetastoreParquet", "spark.sql.hive.convertMetastoreOrc", "spark.sql.hive.convertInsertingPartitionedTable", + "spark.sql.hive.convertInsertingUnpartitionedTable", "spark.sql.hive.convertMetastoreCtas", SQLConf.ADDITIONAL_REMOTE_REPOSITORIES.key) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 5972a9df78ec..e74cc088a1f6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils, Ins import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy, HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation} import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.hive.execution.HiveScriptTransformationExec +import org.apache.spark.sql.hive.execution.InsertIntoHiveTable.BY_CTAS import org.apache.spark.sql.internal.HiveSerDe @@ -194,6 +195,8 @@ object HiveAnalysis extends Rule[LogicalPlan] { * - When writing to non-partitioned Hive-serde Parquet/Orc tables * - When writing to partitioned Hive-serde Parquet/Orc tables when * `spark.sql.hive.convertInsertingPartitionedTable` is true + * - When writing to unpartitioned Hive-serde Parquet/Orc tables when + * `spark.sql.hive.convertInsertingUnpartitionedTable` is true * - When writing to directory with Hive-serde * - When writing to non-partitioned Hive-serde Parquet/ORC tables using CTAS * - When scanning Hive-serde Parquet/ORC tables @@ -230,7 +233,8 @@ case class RelationConversions( case InsertIntoStatement( r: HiveTableRelation, partition, cols, query, overwrite, ifPartitionNotExists, byName) if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && - (!r.isPartitioned || conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE)) + ((r.isPartitioned && conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE)) || + (!r.isPartitioned && conf.getConf(HiveUtils.CONVERT_INSERTING_UNPARTITIONED_TABLE))) && isConvertible(r) => InsertIntoStatement(metastoreCatalog.convert(r, isWrite = true), partition, cols, query, overwrite, ifPartitionNotExists, byName) @@ -245,11 +249,11 @@ case class RelationConversions( // that only matches table insertion inside Hive CTAS. // This pattern would not cause conflicts because this rule is always applied before // `HiveAnalysis` and both of these rules are running once. - case InsertIntoHiveTable( + case i @ InsertIntoHiveTable( tableDesc, _, query, overwrite, ifPartitionNotExists, _, _, _, _, _, _) if query.resolved && DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty && isConvertible(tableDesc) && - conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) => + conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) && i.getTagValue(BY_CTAS).isDefined => // validation is required to be done here before relation conversion. DDLUtils.checkTableColumns(tableDesc.copy(schema = query.schema)) val hiveTable = DDLUtils.readHiveTable(tableDesc) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala index 3fc761785acc..5f59e9ca95f9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveUtils.scala @@ -154,6 +154,16 @@ private[spark] object HiveUtils extends Logging { .booleanConf .createWithDefault(true) + val CONVERT_INSERTING_UNPARTITIONED_TABLE = + buildConf("spark.sql.hive.convertInsertingUnpartitionedTable") + .doc("When set to true, and `spark.sql.hive.convertMetastoreParquet` or " + + "`spark.sql.hive.convertMetastoreOrc` is true, the built-in ORC/Parquet writer is used" + + "to process inserting into unpartitioned ORC/Parquet tables created by using the HiveSQL " + + "syntax.") + .version("4.0.0") + .booleanConf + .createWithDefault(true) + val CONVERT_METASTORE_CTAS = buildConf("spark.sql.hive.convertMetastoreCtas") .doc("When set to true, Spark will try to use built-in data source writer " + "instead of Hive serde in CTAS. This flag is effective only if " + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 811d186b17d2..154d07f80d89 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{CTEInChildren, CTERelationDe import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.{DataWritingCommand, LeafRunnableCommand} +import org.apache.spark.sql.hive.execution.InsertIntoHiveTable.BY_CTAS /** * Create table and insert the query result into it. @@ -98,13 +99,15 @@ case class CreateHiveTableAsSelectCommand( tableExists: Boolean): DataWritingCommand = { // For CTAS, there is no static partition values to insert. val partition = tableDesc.partitionColumnNames.map(_ -> None).toMap - InsertIntoHiveTable( + val insertHive = InsertIntoHiveTable( tableDesc, partition, query, overwrite = false, ifPartitionNotExists = false, outputColumnNames = outputColumnNames) + insertHive.setTagValue(BY_CTAS, ()) + insertHive } override def argString(maxFields: Int): String = { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 4a92bfd84040..cf296e8be4f1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions.{Attribute, SortOrder} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.trees.TreeNodeTag import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.SparkPlan @@ -235,6 +236,12 @@ case class InsertIntoHiveTable( } object InsertIntoHiveTable extends V1WritesHiveUtils { + + /** + * A tag to identify if this command is created by a CTAS. + */ + val BY_CTAS = TreeNodeTag[Unit]("by_ctas") + def apply( table: CatalogTable, partition: Map[String, Option[String]], diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala index e52d9b639dc4..610fc246cd84 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcQuerySuite.scala @@ -284,6 +284,43 @@ class HiveOrcQuerySuite extends OrcQueryTest with TestHiveSingleton { } } + test("SPARK-47850 ORC conversation could be applied for unpartitioned table insertion") { + withTempView("single") { + val singleRowDF = Seq((0, "foo")).toDF("key", "value") + singleRowDF.createOrReplaceTempView("single") + Seq("true", "false").foreach { conversion => + withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "true", + HiveUtils.CONVERT_INSERTING_UNPARTITIONED_TABLE.key -> conversion) { + withTable("dummy_orc_unpartitioned") { + spark.sql( + s""" + |CREATE TABLE dummy_orc_unpartitioned(key INT, value STRING) + |STORED AS ORC + """.stripMargin) + + spark.sql( + s""" + |INSERT INTO TABLE dummy_orc_unpartitioned + |SELECT key, value FROM single + """.stripMargin) + + val orcUnpartitionedTable = TableIdentifier("dummy_orc_unpartitioned", Some("default")) + if (conversion == "true") { + // if converted, we refresh the cached relation. + assert(getCachedDataSourceTable(orcUnpartitionedTable) === null) + } else { + // otherwise, not cached. + assert(getCachedDataSourceTable(orcUnpartitionedTable) === null) + } + + val df = spark.sql("SELECT key, value FROM dummy_orc_unpartitioned WHERE key=0") + checkAnswer(df, singleRowDF) + } + } + } + } + } + test("SPARK-32234 read ORC table with column names all starting with '_col'") { Seq("native", "hive").foreach { orcImpl => Seq("false", "true").foreach { vectorized => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org