This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cfdbfb7349a [SPARK-41726][SQL] Remove 
`OptimizedCreateHiveTableAsSelectCommand`
cfdbfb7349a is described below

commit cfdbfb7349a6c7765b0172c23f133d39196354b0
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Thu Dec 29 17:02:00 2022 -0800

    [SPARK-41726][SQL] Remove `OptimizedCreateHiveTableAsSelectCommand`
    
    ### What changes were proposed in this pull request?
    
    This pr removes `OptimizedCreateHiveTableAsSelectCommand` and move the code 
that tune `InsertIntoHiveTable` to `InsertIntoHadoopFsRelationCommand` into 
`RelationConversions`.
    
    ### Why are the changes needed?
    
    CTAS use a nested execution to do data writing, so it is unnecessary to 
have `OptimizedCreateHiveTableAsSelectCommand`. The inside 
`InsertIntoHiveTable` would be converted to `InsertIntoHadoopFsRelationCommand` 
if possible.
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    fix test
    
    Closes #39263 from ulysses-you/SPARK-41726.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../org/apache/spark/sql/hive/HiveStrategies.scala |  32 +++++-
 .../execution/CreateHiveTableAsSelectCommand.scala | 114 ++++-----------------
 .../sql/hive/execution/HiveExplainSuite.scala      |  24 -----
 .../spark/sql/hive/execution/SQLQuerySuite.scala   |  98 ++++++++++--------
 4 files changed, 104 insertions(+), 164 deletions(-)

diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 42bf1e31bb0..af727f966e5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -28,9 +28,10 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, 
InsertIntoStatement, LogicalPlan, ScriptTransformation, Statistics}
 import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils, 
InsertIntoDataSourceDirCommand}
-import org.apache.spark.sql.execution.datasources.{CreateTable, 
DataSourceStrategy}
+import org.apache.spark.sql.execution.datasources.{CreateTable, 
DataSourceStrategy, HadoopFsRelation, InsertIntoHadoopFsRelationCommand, 
LogicalRelation}
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.hive.execution.HiveScriptTransformationExec
 import org.apache.spark.sql.internal.HiveSerDe
@@ -232,15 +233,36 @@ case class RelationConversions(
           if DDLUtils.isHiveTable(relation.tableMeta) && 
isConvertible(relation) =>
         metastoreCatalog.convert(relation, isWrite = false)
 
-      // CTAS
-      case CreateTable(tableDesc, mode, Some(query))
+      // CTAS path
+      // This `InsertIntoHiveTable` is derived from 
`CreateHiveTableAsSelectCommand`,
+      // that only matches table insertion inside Hive CTAS.
+      // This pattern would not cause conflicts because this rule is always 
applied before
+      // `HiveAnalysis` and both of these rules are running once.
+      case InsertIntoHiveTable(tableDesc, _, query, overwrite, 
ifPartitionNotExists, _)
           if query.resolved && DDLUtils.isHiveTable(tableDesc) &&
             tableDesc.partitionColumnNames.isEmpty && isConvertible(tableDesc) 
&&
             conf.getConf(HiveUtils.CONVERT_METASTORE_CTAS) =>
         // validation is required to be done here before relation conversion.
         DDLUtils.checkTableColumns(tableDesc.copy(schema = query.schema))
-        OptimizedCreateHiveTableAsSelectCommand(
-          tableDesc, query, query.output.map(_.name), mode)
+        val hiveTable = DDLUtils.readHiveTable(tableDesc)
+        val hadoopRelation = metastoreCatalog.convert(hiveTable, isWrite = 
true) match {
+          case LogicalRelation(t: HadoopFsRelation, _, _, _) => t
+          case _ => throw 
QueryCompilationErrors.tableIdentifierNotConvertedToHadoopFsRelationError(
+            tableDesc.identifier)
+        }
+        InsertIntoHadoopFsRelationCommand(
+          hadoopRelation.location.rootPaths.head,
+          Map.empty, // We don't support to convert partitioned table.
+          ifPartitionNotExists,
+          Seq.empty, // We don't support to convert partitioned table.
+          hadoopRelation.bucketSpec,
+          hadoopRelation.fileFormat,
+          hadoopRelation.options,
+          query,
+          if (overwrite) SaveMode.Overwrite else SaveMode.Append,
+          Some(tableDesc),
+          Some(hadoopRelation.location),
+          query.output.map(_.name))
 
       // INSERT HIVE DIR
       case InsertIntoDir(_, storage, provider, query, overwrite)
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 4dfb2cf65eb..a6d85b3f8b3 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -24,17 +24,21 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
SessionCatalog}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.errors.QueryCompilationErrors
-import org.apache.spark.sql.execution.command.{DataWritingCommand, DDLUtils, 
LeafRunnableCommand}
-import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
InsertIntoHadoopFsRelationCommand, LogicalRelation}
-import org.apache.spark.sql.hive.HiveSessionCatalog
-import org.apache.spark.util.Utils
-
-trait CreateHiveTableAsSelectBase extends LeafRunnableCommand {
-  val tableDesc: CatalogTable
-  val query: LogicalPlan
-  val outputColumnNames: Seq[String]
-  val mode: SaveMode
+import org.apache.spark.sql.execution.command.{DataWritingCommand, 
LeafRunnableCommand}
 
+/**
+ * Create table and insert the query result into it.
+ *
+ * @param tableDesc the table description, which may contain serde, storage 
handler etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param mode SaveMode
+ */
+case class CreateHiveTableAsSelectCommand(
+    tableDesc: CatalogTable,
+    query: LogicalPlan,
+    outputColumnNames: Seq[String],
+    mode: SaveMode)
+  extends LeafRunnableCommand {
   assert(query.resolved)
   override def innerChildren: Seq[LogicalPlan] = query :: Nil
 
@@ -60,9 +64,9 @@ trait CreateHiveTableAsSelectBase extends LeafRunnableCommand 
{
       val qe = sparkSession.sessionState.executePlan(command)
       qe.assertCommandExecuted()
     } else {
-        tableDesc.storage.locationUri.foreach { p =>
-          DataWritingCommand.assertEmptyRootPath(p, mode, 
sparkSession.sessionState.newHadoopConf)
-        }
+      tableDesc.storage.locationUri.foreach { p =>
+        DataWritingCommand.assertEmptyRootPath(p, mode, 
sparkSession.sessionState.newHadoopConf)
+      }
       // TODO ideally, we should get the output data ready first and then
       // add the relation into catalog, just in case of failure occurs while 
data
       // processing.
@@ -90,38 +94,7 @@ trait CreateHiveTableAsSelectBase extends 
LeafRunnableCommand {
     Seq.empty[Row]
   }
 
-  // Returns `DataWritingCommand` which actually writes data into the table.
-  def getWritingCommand(
-    catalog: SessionCatalog,
-    tableDesc: CatalogTable,
-    tableExists: Boolean): DataWritingCommand
-
-  // A subclass should override this with the Class name of the concrete type 
expected to be
-  // returned from `getWritingCommand`.
-  def writingCommandClassName: String
-
-  override def argString(maxFields: Int): String = {
-    s"[Database: ${tableDesc.database}, " +
-    s"TableName: ${tableDesc.identifier.table}, " +
-    s"${writingCommandClassName}]"
-  }
-}
-
-/**
- * Create table and insert the query result into it.
- *
- * @param tableDesc the table description, which may contain serde, storage 
handler etc.
- * @param query the query whose result will be insert into the new relation
- * @param mode SaveMode
- */
-case class CreateHiveTableAsSelectCommand(
-    tableDesc: CatalogTable,
-    query: LogicalPlan,
-    outputColumnNames: Seq[String],
-    mode: SaveMode)
-  extends CreateHiveTableAsSelectBase {
-
-  override def getWritingCommand(
+  private def getWritingCommand(
       catalog: SessionCatalog,
       tableDesc: CatalogTable,
       tableExists: Boolean): DataWritingCommand = {
@@ -136,53 +109,8 @@ case class CreateHiveTableAsSelectCommand(
       outputColumnNames = outputColumnNames)
   }
 
-  override def writingCommandClassName: String =
-    Utils.getSimpleName(classOf[InsertIntoHiveTable])
-}
-
-/**
- * Create table and insert the query result into it. This creates Hive table 
but inserts
- * the query result into it by using data source.
- *
- * @param tableDesc the table description, which may contain serde, storage 
handler etc.
- * @param query the query whose result will be insert into the new relation
- * @param mode SaveMode
- */
-case class OptimizedCreateHiveTableAsSelectCommand(
-    tableDesc: CatalogTable,
-    query: LogicalPlan,
-    outputColumnNames: Seq[String],
-    mode: SaveMode)
-  extends CreateHiveTableAsSelectBase {
-
-  override def getWritingCommand(
-      catalog: SessionCatalog,
-      tableDesc: CatalogTable,
-      tableExists: Boolean): DataWritingCommand = {
-    val metastoreCatalog = 
catalog.asInstanceOf[HiveSessionCatalog].metastoreCatalog
-    val hiveTable = DDLUtils.readHiveTable(tableDesc)
-
-    val hadoopRelation = metastoreCatalog.convert(hiveTable, isWrite = true) 
match {
-      case LogicalRelation(t: HadoopFsRelation, _, _, _) => t
-      case _ => throw 
QueryCompilationErrors.tableIdentifierNotConvertedToHadoopFsRelationError(
-        tableIdentifier)
-    }
-
-    InsertIntoHadoopFsRelationCommand(
-      hadoopRelation.location.rootPaths.head,
-      Map.empty, // We don't support to convert partitioned table.
-      false,
-      Seq.empty, // We don't support to convert partitioned table.
-      hadoopRelation.bucketSpec,
-      hadoopRelation.fileFormat,
-      hadoopRelation.options,
-      query,
-      if (tableExists) mode else SaveMode.Overwrite,
-      Some(tableDesc),
-      Some(hadoopRelation.location),
-      query.output.map(_.name))
+  override def argString(maxFields: Int): String = {
+    s"[Database: ${tableDesc.database}, " +
+      s"TableName: ${tableDesc.identifier.table}]"
   }
-
-  override def writingCommandClassName: String =
-    Utils.getSimpleName(classOf[InsertIntoHadoopFsRelationCommand])
 }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
index 258b101dd21..08ebcf3e4dc 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
@@ -23,13 +23,10 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.parser.ParseException
 import 
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
 import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecution
-import 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand
-import org.apache.spark.sql.hive.HiveUtils
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.tags.SlowHiveTest
-import org.apache.spark.util.Utils
 
 /**
  * A set of tests that validates support for Hive Explain command.
@@ -185,27 +182,6 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleto
     }
   }
 
-  test("SPARK-26661: Show actual class name of the writing command in CTAS 
explain") {
-    Seq(true, false).foreach { convertCTAS =>
-      withSQLConf(
-          HiveUtils.CONVERT_METASTORE_CTAS.key -> convertCTAS.toString,
-          HiveUtils.CONVERT_METASTORE_PARQUET.key -> convertCTAS.toString) {
-
-        val df = sql(s"EXPLAIN CREATE TABLE tab1 STORED AS PARQUET AS SELECT * 
FROM range(2)")
-        val keywords = if (convertCTAS) {
-          Seq(
-            s"Execute 
${Utils.getSimpleName(classOf[OptimizedCreateHiveTableAsSelectCommand])}",
-            Utils.getSimpleName(classOf[InsertIntoHadoopFsRelationCommand]))
-        } else {
-          Seq(
-            s"Execute 
${Utils.getSimpleName(classOf[CreateHiveTableAsSelectCommand])}",
-            Utils.getSimpleName(classOf[InsertIntoHiveTable]))
-        }
-        checkKeywordsExist(df, keywords: _*)
-      }
-    }
-  }
-
   test("SPARK-28595: explain should not trigger partition listing") {
     Seq(true, false).foreach { legacyBucketedScan =>
       withSQLConf(
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 7976dab3c44..a902cb3a69e 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -27,6 +27,7 @@ import com.google.common.io.Files
 import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.{SparkException, TestUtils}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
@@ -34,10 +35,11 @@ import 
org.apache.spark.sql.catalyst.catalog.{CatalogTableType, CatalogUtils, Hi
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import 
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
-import org.apache.spark.sql.execution.TestUncaughtExceptionHandler
+import org.apache.spark.sql.execution.{SparkPlanInfo, 
TestUncaughtExceptionHandler}
 import org.apache.spark.sql.execution.adaptive.{DisableAdaptiveExecutionSuite, 
EnableAdaptiveExecutionSuite}
 import org.apache.spark.sql.execution.command.{InsertIntoDataSourceDirCommand, 
LoadDataCommand}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.{HiveExternalCatalog, HiveUtils}
 import org.apache.spark.sql.hive.test.{HiveTestJars, TestHiveSingleton}
@@ -2296,47 +2298,6 @@ abstract class SQLQuerySuiteBase extends QueryTest with 
SQLTestUtils with TestHi
     }
   }
 
-  test("SPARK-25271: Hive ctas commands should use data source if it is 
convertible") {
-    withTempView("p") {
-      Seq(1, 2, 3).toDF("id").createOrReplaceTempView("p")
-
-      Seq("orc", "parquet").foreach { format =>
-        Seq(true, false).foreach { isConverted =>
-          withSQLConf(
-            HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted",
-            HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted") {
-            Seq(true, false).foreach { isConvertedCtas =>
-              withSQLConf(HiveUtils.CONVERT_METASTORE_CTAS.key -> 
s"$isConvertedCtas") {
-
-                val targetTable = "targetTable"
-                withTable(targetTable) {
-                  val df = sql(s"CREATE TABLE $targetTable STORED AS $format 
AS SELECT id FROM p")
-                  checkAnswer(sql(s"SELECT id FROM $targetTable"),
-                    Row(1) :: Row(2) :: Row(3) :: Nil)
-
-                  val ctasDSCommand = df.queryExecution.analyzed.collect {
-                    case _: OptimizedCreateHiveTableAsSelectCommand => true
-                  }.headOption
-                  val ctasCommand = df.queryExecution.analyzed.collect {
-                    case _: CreateHiveTableAsSelectCommand => true
-                  }.headOption
-
-                  if (isConverted && isConvertedCtas) {
-                    assert(ctasDSCommand.nonEmpty)
-                    assert(ctasCommand.isEmpty)
-                  } else {
-                    assert(ctasDSCommand.isEmpty)
-                    assert(ctasCommand.nonEmpty)
-                  }
-                }
-              }
-            }
-          }
-        }
-      }
-    }
-  }
-
   test("SPARK-26181 hasMinMaxStats method of ColumnStatsMap is not correct") {
     withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
       withTable("all_null") {
@@ -2682,10 +2643,63 @@ abstract class SQLQuerySuiteBase extends QueryTest with 
SQLTestUtils with TestHi
 
 @SlowHiveTest
 class SQLQuerySuite extends SQLQuerySuiteBase with 
DisableAdaptiveExecutionSuite {
+  import spark.implicits._
+
   test("SPARK-36421: Validate all SQL configs to prevent from wrong use for 
ConfigEntry") {
     val df = spark.sql("set -v").select("Meaning")
     assert(df.collect().forall(!_.getString(0).contains("ConfigEntry")))
   }
+
+  test("SPARK-25271: Hive ctas commands should use data source if it is 
convertible") {
+    withTempView("p") {
+      Seq(1, 2, 3).toDF("id").createOrReplaceTempView("p")
+
+      Seq("orc", "parquet").foreach { format =>
+        Seq(true, false).foreach { isConverted =>
+          withSQLConf(
+            HiveUtils.CONVERT_METASTORE_ORC.key -> s"$isConverted",
+            HiveUtils.CONVERT_METASTORE_PARQUET.key -> s"$isConverted") {
+            Seq(true, false).foreach { isConvertedCtas =>
+              withSQLConf(HiveUtils.CONVERT_METASTORE_CTAS.key -> 
s"$isConvertedCtas") {
+
+                val targetTable = "targetTable"
+                withTable(targetTable) {
+                  var commands: Seq[SparkPlanInfo] = Seq.empty
+                  val listener = new SparkListener {
+                    override def onOtherEvent(event: SparkListenerEvent): Unit 
= {
+                      event match {
+                        case start: SparkListenerSQLExecutionStart =>
+                          commands = commands ++ Seq(start.sparkPlanInfo)
+                        case _ => // ignore other events
+                      }
+                    }
+                  }
+                  spark.sparkContext.addSparkListener(listener)
+                  try {
+                    sql(s"CREATE TABLE $targetTable STORED AS $format AS 
SELECT id FROM p")
+                    checkAnswer(sql(s"SELECT id FROM $targetTable"),
+                      Row(1) :: Row(2) :: Row(3) :: Nil)
+                    spark.sparkContext.listenerBus.waitUntilEmpty()
+                    assert(commands.size == 3)
+                    assert(commands.head.nodeName == "Execute 
CreateHiveTableAsSelectCommand")
+
+                    val v1WriteCommand = commands(1)
+                    if (isConverted && isConvertedCtas) {
+                      assert(v1WriteCommand.nodeName == "Execute 
InsertIntoHadoopFsRelationCommand")
+                    } else {
+                      assert(v1WriteCommand.nodeName == "Execute 
InsertIntoHiveTable")
+                    }
+                  } finally {
+                    spark.sparkContext.removeSparkListener(listener)
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
 }
 @SlowHiveTest
 class SQLQuerySuiteAE extends SQLQuerySuiteBase with 
EnableAdaptiveExecutionSuite


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to