Github user gatorsmile commented on a diff in the pull request:
https://github.com/apache/spark/pull/19841#discussion_r154490891
--- Diff:
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
---
@@ -104,147 +105,153 @@ case class InsertIntoHiveTable(
val partitionColumns =
fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
val partitionColumnNames =
Option(partitionColumns).map(_.split("/")).getOrElse(Array.empty)
- // By this time, the partition map must match the table's partition
columns
- if (partitionColumnNames.toSet != partition.keySet) {
- throw new SparkException(
- s"""Requested partitioning does not match the
${table.identifier.table} table:
- |Requested partitions: ${partition.keys.mkString(",")}
- |Table partitions:
${table.partitionColumnNames.mkString(",")}""".stripMargin)
- }
-
- // Validate partition spec if there exist any dynamic partitions
- if (numDynamicPartitions > 0) {
- // Report error if dynamic partitioning is not enabled
- if (!hadoopConf.get("hive.exec.dynamic.partition",
"true").toBoolean) {
- throw new
SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
+ def processInsert = {
+ // By this time, the partition map must match the table's partition
columns
+ if (partitionColumnNames.toSet != partition.keySet) {
+ throw new SparkException(
+ s"""Requested partitioning does not match the
${table.identifier.table} table:
+ |Requested partitions: ${partition.keys.mkString(",")}
+ |Table partitions:
${table.partitionColumnNames.mkString(",")}""".stripMargin)
}
- // Report error if dynamic partition strict mode is on but no static
partition is found
- if (numStaticPartitions == 0 &&
- hadoopConf.get("hive.exec.dynamic.partition.mode",
"strict").equalsIgnoreCase("strict")) {
- throw new
SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
- }
+ // Validate partition spec if there exist any dynamic partitions
+ if (numDynamicPartitions > 0) {
+ // Report error if dynamic partitioning is not enabled
+ if (!hadoopConf.get("hive.exec.dynamic.partition",
"true").toBoolean) {
+ throw new
SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
+ }
+
+ // Report error if dynamic partition strict mode is on but no
static partition is found
+ if (numStaticPartitions == 0 &&
+ hadoopConf.get("hive.exec.dynamic.partition.mode",
"strict").equalsIgnoreCase("strict")) {
+ throw new
SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
+ }
- // Report error if any static partition appears after a dynamic
partition
- val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
- if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
- throw new
AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
+ // Report error if any static partition appears after a dynamic
partition
+ val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
+ if (isDynamic.init.zip(isDynamic.tail).contains((true, false))) {
+ throw new
AnalysisException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
+ }
}
- }
- table.bucketSpec match {
- case Some(bucketSpec) =>
- // Writes to bucketed hive tables are allowed only if user does
not care about maintaining
- // table's bucketing ie. both "hive.enforce.bucketing" and
"hive.enforce.sorting" are
- // set to false
- val enforceBucketingConfig = "hive.enforce.bucketing"
- val enforceSortingConfig = "hive.enforce.sorting"
+ table.bucketSpec match {
+ case Some(bucketSpec) =>
+ // Writes to bucketed hive tables are allowed only if user does
not care about maintaining
+ // table's bucketing ie. both "hive.enforce.bucketing" and
"hive.enforce.sorting" are
+ // set to false
+ val enforceBucketingConfig = "hive.enforce.bucketing"
+ val enforceSortingConfig = "hive.enforce.sorting"
- val message = s"Output Hive table ${table.identifier} is bucketed
but Spark" +
- "currently does NOT populate bucketed output which is compatible
with Hive."
+ val message = s"Output Hive table ${table.identifier} is
bucketed but Spark" +
+ "currently does NOT populate bucketed output which is
compatible with Hive."
- if (hadoopConf.get(enforceBucketingConfig, "true").toBoolean ||
- hadoopConf.get(enforceSortingConfig, "true").toBoolean) {
- throw new AnalysisException(message)
- } else {
- logWarning(message + s" Inserting data anyways since both
$enforceBucketingConfig and " +
- s"$enforceSortingConfig are set to false.")
- }
- case _ => // do nothing since table has no bucketing
- }
+ if (hadoopConf.get(enforceBucketingConfig, "true").toBoolean ||
+ hadoopConf.get(enforceSortingConfig, "true").toBoolean) {
+ throw new AnalysisException(message)
+ } else {
+ logWarning(message + s" Inserting data anyways since both
$enforceBucketingConfig and "
+ + s"$enforceSortingConfig are set to false.")
+ }
+ case _ => // do nothing since table has no bucketing
+ }
- val partitionAttributes =
partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
- query.resolve(name :: Nil,
sparkSession.sessionState.analyzer.resolver).getOrElse {
- throw new AnalysisException(
- s"Unable to resolve $name given
[${query.output.map(_.name).mkString(", ")}]")
- }.asInstanceOf[Attribute]
- }
+ val partitionAttributes =
partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
+ query.resolve(name :: Nil,
sparkSession.sessionState.analyzer.resolver).getOrElse {
+ throw new AnalysisException(
+ s"Unable to resolve $name given
[${query.output.map(_.name).mkString(", ")}]")
+ }.asInstanceOf[Attribute]
+ }
- saveAsHiveFile(
- sparkSession = sparkSession,
- queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
- hadoopConf = hadoopConf,
- fileSinkConf = fileSinkConf,
- outputLocation = tmpLocation.toString,
- partitionAttributes = partitionAttributes)
+ saveAsHiveFile(
+ sparkSession = sparkSession,
+ queryExecution = Dataset.ofRows(sparkSession,
query).queryExecution,
+ hadoopConf = hadoopConf,
+ fileSinkConf = fileSinkConf,
+ outputLocation = tmpLocation.toString,
+ partitionAttributes = partitionAttributes)
- if (partition.nonEmpty) {
- if (numDynamicPartitions > 0) {
- externalCatalog.loadDynamicPartitions(
- db = table.database,
- table = table.identifier.table,
- tmpLocation.toString,
- partitionSpec,
- overwrite,
- numDynamicPartitions)
- } else {
- // scalastyle:off
- // ifNotExists is only valid with static partition, refer to
- //
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
- // scalastyle:on
- val oldPart =
+ if (partition.nonEmpty) {
+ if (numDynamicPartitions > 0) {
+ externalCatalog.loadDynamicPartitions(
+ db = table.database,
+ table = table.identifier.table,
+ tmpLocation.toString,
+ partitionSpec,
+ overwrite,
+ numDynamicPartitions)
+ } else {
+ // scalastyle:off
+ // ifNotExists is only valid with static partition, refer to
+ //
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML#LanguageManualDML-InsertingdataintoHiveTablesfromqueries
+ // scalastyle:on
+ val oldPart =
externalCatalog.getPartitionOption(
table.database,
table.identifier.table,
partitionSpec)
- var doHiveOverwrite = overwrite
-
- if (oldPart.isEmpty || !ifPartitionNotExists) {
- // SPARK-18107: Insert overwrite runs much slower than
hive-client.
- // Newer Hive largely improves insert overwrite performance. As
Spark uses older Hive
- // version and we may not want to catch up new Hive version
every time. We delete the
- // Hive partition first and then load data file into the Hive
partition.
- if (oldPart.nonEmpty && overwrite) {
- oldPart.get.storage.locationUri.foreach { uri =>
- val partitionPath = new Path(uri)
- val fs = partitionPath.getFileSystem(hadoopConf)
- if (fs.exists(partitionPath)) {
- if (!fs.delete(partitionPath, true)) {
- throw new RuntimeException(
- "Cannot remove partition directory '" +
partitionPath.toString)
+ var doHiveOverwrite = overwrite
+
+ if (oldPart.isEmpty || !ifPartitionNotExists) {
+ // SPARK-18107: Insert overwrite runs much slower than
hive-client.
+ // Newer Hive largely improves insert overwrite performance.
As Spark uses older Hive
+ // version and we may not want to catch up new Hive version
every time. We delete the
+ // Hive partition first and then load data file into the Hive
partition.
+ if (oldPart.nonEmpty && overwrite) {
+ oldPart.get.storage.locationUri.foreach { uri =>
+ val partitionPath = new Path(uri)
+ val fs = partitionPath.getFileSystem(hadoopConf)
+ if (fs.exists(partitionPath)) {
+ if (!fs.delete(partitionPath, true)) {
+ throw new RuntimeException(
+ "Cannot remove partition directory '" +
partitionPath.toString)
+ }
+ // Don't let Hive do overwrite operation since it is
slower.
+ doHiveOverwrite = false
}
- // Don't let Hive do overwrite operation since it is
slower.
- doHiveOverwrite = false
}
}
- }
- // inheritTableSpecs is set to true. It should be set to false
for an IMPORT query
- // which is currently considered as a Hive native command.
- val inheritTableSpecs = true
- externalCatalog.loadPartition(
- table.database,
- table.identifier.table,
- tmpLocation.toString,
- partitionSpec,
- isOverwrite = doHiveOverwrite,
- inheritTableSpecs = inheritTableSpecs,
- isSrcLocal = false)
+ // inheritTableSpecs is set to true. It should be set to false
for an IMPORT query
+ // which is currently considered as a Hive native command.
+ val inheritTableSpecs = true
+ externalCatalog.loadPartition(
+ table.database,
+ table.identifier.table,
+ tmpLocation.toString,
+ partitionSpec,
+ isOverwrite = doHiveOverwrite,
+ inheritTableSpecs = inheritTableSpecs,
+ isSrcLocal = false)
+ }
}
+ } else {
+ externalCatalog.loadTable(
+ table.database,
+ table.identifier.table,
+ tmpLocation.toString, // TODO: URI
+ overwrite,
+ isSrcLocal = false)
}
- } else {
- externalCatalog.loadTable(
- table.database,
- table.identifier.table,
- tmpLocation.toString, // TODO: URI
- overwrite,
- isSrcLocal = false)
}
- // Attempt to delete the staging directory and the inclusive files. If
failed, the files are
- // expected to be dropped at the normal termination of VM since
deleteOnExit is used.
- deleteExternalTmpPath(hadoopConf)
+ try {
+ processInsert
--- End diff --
Please create a separate function from line 165 to line 235.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]