This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new cfdbfb7349a [SPARK-41726][SQL] Remove `OptimizedCreateHiveTableAsSelectCommand` cfdbfb7349a is described below commit cfdbfb7349a6c7765b0172c23f133d39196354b0 Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Thu Dec 29 17:02:00 2022 -0800 [SPARK-41726][SQL] Remove `OptimizedCreateHiveTableAsSelectCommand` ### What changes were proposed in this pull request? This pr removes `OptimizedCreateHiveTableAsSelectCommand` and move the code that tune `InsertIntoHiveTable` to `InsertIntoHadoopFsRelationCommand` into `RelationConversions`. ### Why are the changes needed? CTAS use a nested execution to do data writing, so it is unnecessary to have `OptimizedCreateHiveTableAsSelectCommand`. The inside `InsertIntoHiveTable` would be converted to `InsertIntoHadoopFsRelationCommand` if possible. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? fix test Closes #39263 from ulysses-you/SPARK-41726. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../org/apache/spark/sql/hive/HiveStrategies.scala | 32 +++++- .../execution/CreateHiveTableAsSelectCommand.scala | 114 ++++----------------- .../sql/hive/execution/HiveExplainSuite.scala | 24 ----- .../spark/sql/hive/execution/SQLQuerySuite.scala | 98 ++++++++++-------- 4 files changed, 104 insertions(+), 164 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 42bf1e31bb0..af727f966e5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -28,9 +28,10 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils, InsertIntoDataSourceDirCommand} -import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy} +import org.apache.spark.sql.execution.datasources.{CreateTable, DataSourceStrategy, HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation} import org.apache.spark.sql.hive.execution._ import org.apache.spark.sql.hive.execution.HiveScriptTransformationExec import org.apache.spark.sql.internal.HiveSerDe @@ -232,15 +233,36 @@ case class RelationConversions( if DDLUtils.isHiveTable(relation.tableMeta) && isConvertible(relation) => metastoreCatalog.convert(relation, isWrite = false) - // CTAS - case CreateTable(tableDesc, mode, Some(query)) + // CTAS path + // This `InsertIntoHiveTable` is derived from `CreateHiveTableAsSelectCommand`, + // that only matches table insertion inside Hive CTAS. + // This pattern would not cause conflicts because this rule is always applied before + // `HiveAnalysis` and both of these rules are running once. + case InsertIntoHiveTable(tableDesc, _, query, overwrite, ifPartitionNotExists, _) if query.resolved && DDLUtils.isHiveTable(tableDesc) && tableDesc.partitionColumnNames.isEmpty && isConvertible(tableDesc) && conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) => // validation is required to be done here before relation conversion. DDLUtils.checkTableColumns(tableDesc.copy(schema = query.schema)) - OptimizedCreateHiveTableAsSelectCommand( - tableDesc, query, query.output.map(_.name), mode) + val hiveTable = DDLUtils.readHiveTable(tableDesc) + val hadoopRelation = metastoreCatalog.convert(hiveTable, isWrite = true) match { + case LogicalRelation(t: HadoopFsRelation, _, _, _) => t + case _ => throw QueryCompilationErrors.tableIdentifierNotConvertedToHadoopFsRelationError( + tableDesc.identifier) + } + InsertIntoHadoopFsRelationCommand( + hadoopRelation.location.rootPaths.head, + Map.empty, // We don't support to convert partitioned table. + ifPartitionNotExists, + Seq.empty, // We don't support to convert partitioned table. + hadoopRelation.bucketSpec, + hadoopRelation.fileFormat, + hadoopRelation.options, + query, + if (overwrite) SaveMode.Overwrite else SaveMode.Append, + Some(tableDesc), + Some(hadoopRelation.location), + query.output.map(_.name)) // INSERT HIVE DIR case InsertIntoDir(_, storage, provider, query, overwrite) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 4dfb2cf65eb..a6d85b3f8b3 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -24,17 +24,21 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SessionCatalog} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.execution.command.{DataWritingCommand, DDLUtils, LeafRunnableCommand} -import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, InsertIntoHadoopFsRelationCommand, LogicalRelation} -import org.apache.spark.sql.hive.HiveSessionCatalog -import org.apache.spark.util.Utils - -trait CreateHiveTableAsSelectBase extends LeafRunnableCommand { - val tableDesc: CatalogTable - val query: LogicalPlan - val outputColumnNames: Seq[String] - val mode: SaveMode +import org.apache.spark.sql.execution.command.{DataWritingCommand, LeafRunnableCommand} +/** + * Create table and insert the query result into it. + * + * @param tableDesc the table description, which may contain serde, storage handler etc. + * @param query the query whose result will be insert into the new relation + * @param mode SaveMode + */ +case class CreateHiveTableAsSelectCommand( + tableDesc: CatalogTable, + query: LogicalPlan, + outputColumnNames: Seq[String], + mode: SaveMode) + extends LeafRunnableCommand { assert(query.resolved) override def innerChildren: Seq[LogicalPlan] = query :: Nil @@ -60,9 +64,9 @@ trait CreateHiveTableAsSelectBase extends LeafRunnableCommand { val qe = sparkSession.sessionState.executePlan(command) qe.assertCommandExecuted() } else { - tableDesc.storage.locationUri.foreach { p => - DataWritingCommand.assertEmptyRootPath(p, mode, sparkSession.sessionState.newHadoopConf) - } + tableDesc.storage.locationUri.foreach { p => + DataWritingCommand.assertEmptyRootPath(p, mode, sparkSession.sessionState.newHadoopConf) + } // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data // processing. @@ -90,38 +94,7 @@ trait CreateHiveTableAsSelectBase extends LeafRunnableCommand { Seq.empty[Row] } - // Returns `DataWritingCommand` which actually writes data into the table. - def getWritingCommand( - catalog: SessionCatalog, - tableDesc: CatalogTable, - tableExists: Boolean): DataWritingCommand - - // A subclass should override this with the Class name of the concrete type expected to be - // returned from `getWritingCommand`. - def writingCommandClassName: String - - override def argString(maxFields: Int): String = { - s"[Database: ${tableDesc.database}, " + - s"TableName: ${tableDesc.identifier.table}, " + - s"${writingCommandClassName}]" - } -} - -/** - * Create table and insert the query result into it. - * - * @param tableDesc the table description, which may contain serde, storage handler etc. - * @param query the query whose result will be insert into the new relation - * @param mode SaveMode - */ -case class CreateHiveTableAsSelectCommand( - tableDesc: CatalogTable, - query: LogicalPlan, - outputColumnNames: Seq[String], - mode: SaveMode) - extends CreateHiveTableAsSelectBase { - - override def getWritingCommand( + private def getWritingCommand( catalog: SessionCatalog, tableDesc: CatalogTable, tableExists: Boolean): DataWritingCommand = { @@ -136,53 +109,8 @@ case class CreateHiveTableAsSelectCommand( outputColumnNames = outputColumnNames) } - override def writingCommandClassName: String = - Utils.getSimpleName(classOf[InsertIntoHiveTable]) -} - -/** - * Create table and insert the query result into it. This creates Hive table but inserts - * the query result into it by using data source. - * - * @param tableDesc the table description, which may contain serde, storage handler etc. - * @param query the query whose result will be insert into the new relation - * @param mode SaveMode - */ -case class OptimizedCreateHiveTableAsSelectCommand( - tableDesc: CatalogTable, - query: LogicalPlan, - outputColumnNames: Seq[String], - mode: SaveMode) - extends CreateHiveTableAsSelectBase { - - override def getWritingCommand( - catalog: SessionCatalog, - tableDesc: CatalogTable, - tableExists: Boolean): DataWritingCommand = { - val metastoreCatalog = catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog - val hiveTable = DDLUtils.readHiveTable(tableDesc) - - val hadoopRelation = metastoreCatalog.convert(hiveTable, isWrite = true) match { - case LogicalRelation(t: HadoopFsRelation, _, _, _) => t - case _ => throw QueryCompilationErrors.tableIdentifierNotConvertedToHadoopFsRelationError( - tableIdentifier) - } - - InsertIntoHadoopFsRelationCommand( - hadoopRelation.location.rootPaths.head, - Map.empty, // We don't support to convert partitioned table. - false, - Seq.empty, // We don't support to convert partitioned table. - hadoopRelation.bucketSpec, - hadoopRelation.fileFormat, - hadoopRelation.options, - query, - if (tableExists) mode else SaveMode.Overwrite, - Some(tableDesc), - Some(hadoopRelation.location), - query.output.map(_.name)) + override def argString(maxFields: Int): String = { + s"[Database: ${tableDesc.database}, " + + s"TableName: ${tableDesc.identifier.table}]" } - - override def writingCommandClassName: String = - Utils.getSimpleName(classOf[InsertIntoHadoopFsRelationCommand]) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala index 258b101dd21..08ebcf3e4dc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala @@ -23,13 +23,10 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution -import org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand -import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.tags.SlowHiveTest -import org.apache.spark.util.Utils /** * A set of tests that validates support for Hive Explain command. @@ -185,27 +182,6 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto } } - test("SPARK-26661: Show actual class name of the writing command in CTAS explain") { - Seq(true, false).foreach { convertCTAS => - withSQLConf( - HiveUtils.CONVERT_METASTORE_CTAS.key -> convertCTAS.toString, - HiveUtils.CONVERT_METASTORE_PARQUET.key -> convertCTAS.toString) { - - val df = sql(s"EXPLAIN CREATE TABLE tab1 STORED AS PARQUET AS SELECT * FROM range(2)") - val keywords = if (convertCTAS) { - Seq( - s"Execute ${Utils.getSimpleName(classOf[OptimizedCreateHiveTableAsSelectCommand])}", - Utils.getSimpleName(classOf[InsertIntoHadoopFsRelationCommand])) - } else { - Seq( - s"Execute ${Utils.getSimpleName(classOf[CreateHiveTableAsSelectCommand])}", - Utils.getSimpleName(classOf[InsertIntoHiveTable])) - } - checkKeywordsExist(df, keywords: _*) - } - } - } - test("SPARK-28595: explain should not trigger partition listing") { Seq(true, false).foreach { legacyBucketedScan => withSQLConf( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index 7976dab3c44..a902cb3a69e 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -27,6 +27,7 @@ import com.google.common.io.Files import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{SparkException, TestUtils} +import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases @@ -34,10 +35,11 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, Hi import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME -import org.apache.spark.sql.execution.TestUncaughtExceptionHandler +import org.apache.spark.sql.execution.{SparkPlanInfo, TestUncaughtExceptionHandler} import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, EnableAdaptiveExecutionSuite} import org.apache.spark.sql.execution.command.{InsertIntoDataSourceDirCommand, LoadDataCommand} import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart import org.apache.spark.sql.functions._ import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils} import org.apache.spark.sql.hive.test.{HiveTestJars, TestHiveSingleton} @@ -2296,47 +2298,6 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi } } - test("SPARK-25271: Hive ctas commands should use data source if it is convertible") { - withTempView("p") { - Seq(1, 2, 3).toDF("id").createOrReplaceTempView("p") - - Seq("orc", "parquet").foreach { format => - Seq(true, false).foreach { isConverted => - withSQLConf( - HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted", - HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted") { - Seq(true, false).foreach { isConvertedCtas => - withSQLConf(HiveUtils.CONVERT_METASTORE_CTAS.key -> s"$isConvertedCtas") { - - val targetTable = "targetTable" - withTable(targetTable) { - val df = sql(s"CREATE TABLE $targetTable STORED AS $format AS SELECT id FROM p") - checkAnswer(sql(s"SELECT id FROM $targetTable"), - Row(1) :: Row(2) :: Row(3) :: Nil) - - val ctasDSCommand = df.queryExecution.analyzed.collect { - case _: OptimizedCreateHiveTableAsSelectCommand => true - }.headOption - val ctasCommand = df.queryExecution.analyzed.collect { - case _: CreateHiveTableAsSelectCommand => true - }.headOption - - if (isConverted && isConvertedCtas) { - assert(ctasDSCommand.nonEmpty) - assert(ctasCommand.isEmpty) - } else { - assert(ctasDSCommand.isEmpty) - assert(ctasCommand.nonEmpty) - } - } - } - } - } - } - } - } - } - test("SPARK-26181 hasMinMaxStats method of ColumnStatsMap is not correct") { withSQLConf(SQLConf.CBO_ENABLED.key -> "true") { withTable("all_null") { @@ -2682,10 +2643,63 @@ abstract class SQLQuerySuiteBase extends QueryTest with SQLTestUtils with TestHi @SlowHiveTest class SQLQuerySuite extends SQLQuerySuiteBase with DisableAdaptiveExecutionSuite { + import spark.implicits._ + test("SPARK-36421: Validate all SQL configs to prevent from wrong use for ConfigEntry") { val df = spark.sql("set -v").select("Meaning") assert(df.collect().forall(!_.getString(0).contains("ConfigEntry"))) } + + test("SPARK-25271: Hive ctas commands should use data source if it is convertible") { + withTempView("p") { + Seq(1, 2, 3).toDF("id").createOrReplaceTempView("p") + + Seq("orc", "parquet").foreach { format => + Seq(true, false).foreach { isConverted => + withSQLConf( + HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted", + HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted") { + Seq(true, false).foreach { isConvertedCtas => + withSQLConf(HiveUtils.CONVERT_METASTORE_CTAS.key -> s"$isConvertedCtas") { + + val targetTable = "targetTable" + withTable(targetTable) { + var commands: Seq[SparkPlanInfo] = Seq.empty + val listener = new SparkListener { + override def onOtherEvent(event: SparkListenerEvent): Unit = { + event match { + case start: SparkListenerSQLExecutionStart => + commands = commands ++ Seq(start.sparkPlanInfo) + case _ => // ignore other events + } + } + } + spark.sparkContext.addSparkListener(listener) + try { + sql(s"CREATE TABLE $targetTable STORED AS $format AS SELECT id FROM p") + checkAnswer(sql(s"SELECT id FROM $targetTable"), + Row(1) :: Row(2) :: Row(3) :: Nil) + spark.sparkContext.listenerBus.waitUntilEmpty() + assert(commands.size == 3) + assert(commands.head.nodeName == "Execute CreateHiveTableAsSelectCommand") + + val v1WriteCommand = commands(1) + if (isConverted && isConvertedCtas) { + assert(v1WriteCommand.nodeName == "Execute InsertIntoHadoopFsRelationCommand") + } else { + assert(v1WriteCommand.nodeName == "Execute InsertIntoHiveTable") + } + } finally { + spark.sparkContext.removeSparkListener(listener) + } + } + } + } + } + } + } + } + } } @SlowHiveTest class SQLQuerySuiteAE extends SQLQuerySuiteBase with EnableAdaptiveExecutionSuite --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org