[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`
ulysses-you commented on code in PR #39277: URL: https://github.com/apache/spark/pull/39277#discussion_r1064238626 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala: ## @@ -294,3 +285,40 @@ case class InsertIntoHiveTable( override protected def withNewChildInternal(newChild: LogicalPlan): InsertIntoHiveTable = copy(query = newChild) } + +object InsertIntoHiveTable extends V1WritesHiveUtils with Logging { Review Comment: oh, I missed to clean up it. Will remove it when I touch the related code -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`
ulysses-you commented on code in PR #39277: URL: https://github.com/apache/spark/pull/39277#discussion_r1062552806 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala: ## @@ -92,29 +98,17 @@ case class InsertIntoHiveTable( */ override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { val externalCatalog = sparkSession.sharedState.externalCatalog -val hadoopConf = sparkSession.sessionState.newHadoopConf() - -val hiveQlTable = HiveClientImpl.toHiveTable(table) -// Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer -// instances within the closure, since Serializer is not serializable while TableDesc is. -val tableDesc = new TableDesc( - hiveQlTable.getInputFormatClass, - // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because - // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to - // substitute some output formats, e.g. substituting SequenceFileOutputFormat to - // HiveSequenceFileOutputFormat. - hiveQlTable.getOutputFormatClass, - hiveQlTable.getMetadata -) -val tableLocation = hiveQlTable.getDataLocation -val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation) +val hadoopConf = externalTmpPath.hadoopConf +val stagingDir = externalTmpPath.stagingDir +val tmpLocation = externalTmpPath.externalTempPath +createExternalTmpPath(stagingDir, hadoopConf) try { - processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation, child) + processInsert(sparkSession, externalCatalog, hadoopConf, tmpLocation, child) } finally { // Attempt to delete the staging directory and the inclusive files. If failed, the files are // expected to be dropped at the normal termination of VM since deleteOnExit is used. - deleteExternalTmpPath(hadoopConf) + deleteExternalTmpPath(stagingDir, hadoopConf) Review Comment: addressed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`
ulysses-you commented on code in PR #39277: URL: https://github.com/apache/spark/pull/39277#discussion_r1062551972 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala: ## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import java.io.IOException +import java.net.URI +import java.text.SimpleDateFormat +import java.util.{Date, Locale, Random} + +import scala.util.control.NonFatal + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.hadoop.hive.common.FileUtils +import org.apache.hadoop.hive.ql.exec.TaskRunner + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.hive.HiveExternalCatalog +import org.apache.spark.sql.hive.client.HiveVersion + +class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: Path) + extends Logging { + private var stagingDirForCreating: Option[Path] = None + + lazy val externalTempPath: Path = getExternalTmpPath(path) + + private def getExternalTmpPath(path: Path): Path = { +import org.apache.spark.sql.hive.client.hive._ + +// Before Hive 1.1, when inserting into a table, Hive will create the staging directory under +// a common scratch directory. After the writing is finished, Hive will simply empty the table +// directory and move the staging directory to it. +// After Hive 1.1, Hive will create the staging directory under the table directory, and when +// moving staging directory to table directory, Hive will still empty the table directory, but +// will exclude the staging directory there. +// We have to follow the Hive behavior here, to avoid troubles. For example, if we create +// staging directory under the table director for Hive prior to 1.1, the staging directory will +// be removed by Hive when Hive is trying to empty the table directory. +val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, v14, v1_0) +val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] = + Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1) + +// Ensure all the supported versions are considered here. +assert(hiveVersionsUsingNewExternalTempPath ++ hiveVersionsUsingOldExternalTempPath == + allSupportedHiveVersions) + +val externalCatalog = session.sharedState.externalCatalog +val hiveVersion = externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version +val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging") +val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive") + +if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) { + oldVersionExternalTempPath(path, scratchDir) +} else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) { + newVersionExternalTempPath(path, stagingDir) +} else { + throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion) +} + } + + // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13 + private def oldVersionExternalTempPath(path: Path, scratchDir: String): Path = { +val extURI: URI = path.toUri +val scratchPath = new Path(scratchDir, executionId) +var dirPath = new Path( + extURI.getScheme, + extURI.getAuthority, + scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID()) + +val fs = dirPath.getFileSystem(hadoopConf) +dirPath = new Path(fs.makeQualified(dirPath).toString()) +stagingDirForCreating = Some(dirPath) +dirPath + } + + // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 + private def newVersionExternalTempPath(path: Path, stagingDir: String): Path = { +val extURI: URI = path.toUri +if (extURI.getScheme == "viewfs") { + val qualifiedStagingDir = getStagingDir(path, stagingDir) + stagingDirForCreating = Some(qualifiedStagingDir) + // Hive uses 1 + new Path(qualifiedStagingDir, "-ext-1") +} else { + val qualifiedStagingDir =
[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`
ulysses-you commented on code in PR #39277: URL: https://github.com/apache/spark/pull/39277#discussion_r1062052748 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala: ## @@ -105,4 +112,164 @@ trait V1WritesHiveUtils { .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> "true")) .getOrElse(Map.empty) } + + def setupCompression( + fileSinkConf: FileSinkDesc, + hadoopConf: Configuration, + sparkSession: SparkSession): Unit = { +val isCompressed = + fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) match { +case formatName if formatName.endsWith("orcoutputformat") => + // For ORC,"mapreduce.output.fileoutputformat.compress", + // "mapreduce.output.fileoutputformat.compress.codec", and + // "mapreduce.output.fileoutputformat.compress.type" + // have no impact because it uses table properties to store compression information. + false +case _ => hadoopConf.get("hive.exec.compress.output", "false").toBoolean + } + +if (isCompressed) { + hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true") + fileSinkConf.setCompressed(true) + fileSinkConf.setCompressCodec(hadoopConf +.get("mapreduce.output.fileoutputformat.compress.codec")) + fileSinkConf.setCompressType(hadoopConf +.get("mapreduce.output.fileoutputformat.compress.type")) +} else { + // Set compression by priority + HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf) +.foreach { case (compression, codec) => hadoopConf.set(compression, codec) } +} + } + + /** + * Return two paths: + * 1. The first path is `stagingDir` which can be the parent path of `externalTmpPath` + * 2. The second path is `externalTmpPath`, e.g. `$stagingDir/-ext-1` + * The call side should create `stagingDir` before using `externalTmpPath` and + * delete `stagingDir` at the end. Review Comment: wrapped using `HiveTempPath` since it would be used by `InsertIntoHiveDirCommand` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`
ulysses-you commented on code in PR #39277: URL: https://github.com/apache/spark/pull/39277#discussion_r1062020277 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala: ## @@ -53,13 +59,17 @@ case class WriteFiles(child: LogicalPlan) extends UnaryNode { /** * Responsible for writing files. */ -case class WriteFilesExec(child: SparkPlan) extends UnaryExecNode { +case class WriteFilesExec( +child: SparkPlan, +fileFormat: FileFormat, +partitionColumns: Seq[Attribute], +bucketSpec: Option[BucketSpec], +options: Map[String, String], +staticPartitions: TablePartitionSpec) extends UnaryExecNode { override def output: Seq[Attribute] = Seq.empty - override protected def doExecuteWrite(writeSpec: WriteSpec): RDD[WriterCommitMessage] = { -assert(writeSpec.isInstanceOf[WriteFilesSpec]) -val writeFilesSpec: WriteFilesSpec = writeSpec.asInstanceOf[WriteFilesSpec] - + override protected def doExecuteWrite( + writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = { Review Comment: Seems it's a bit hard. look at the current information: ```scala case class WriteFilesSpec( description: WriteJobDescription, committer: FileCommitProtocol, concurrentOutputWriterSpecFunc: SparkPlan => Option[ConcurrentOutputWriterSpec]) ``` - `ConcurrentOutputWriterSpec` and `FileCommitProtocol` contain the output spec so we can not replace them - `WriteJobDescription` contains many information which includes what we pull out, but if we want to reduce something inside `WriteJobDescription`, we need to create a new class to hold others. I'm not sure it's worth to do that. ```scala class WriteJobDescription( val uuid: String, val serializableHadoopConf: SerializableConfiguration, val outputWriterFactory: OutputWriterFactory, val allColumns: Seq[Attribute], val dataColumns: Seq[Attribute], val partitionColumns: Seq[Attribute], val bucketSpec: Option[WriterBucketSpec], val path: String, val customPartitionLocations: Map[TablePartitionSpec, String], val maxRecordsPerFile: Long, val timeZoneId: String, val statsTrackers: Seq[WriteJobStatsTracker]) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`
ulysses-you commented on code in PR #39277: URL: https://github.com/apache/spark/pull/39277#discussion_r1059246929 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala: ## @@ -38,13 +38,18 @@ case class WriteFilesSpec( description: WriteJobDescription, committer: FileCommitProtocol, concurrentOutputWriterSpecFunc: SparkPlan => Option[ConcurrentOutputWriterSpec]) - extends WriteSpec /** * During Optimizer, [[V1Writes]] injects the [[WriteFiles]] between [[V1WriteCommand]] and query. * [[WriteFiles]] must be the root plan as the child of [[V1WriteCommand]]. */ -case class WriteFiles(child: LogicalPlan) extends UnaryNode { +case class WriteFiles( +child: LogicalPlan, +fileFormat: FileFormat, +partitionColumns: Seq[Attribute], +bucketSpec: Option[BucketSpec], +options: Map[String, String], +requiredOrdering: Seq[SortOrder]) extends UnaryNode { Review Comment: how about pull out `partitionSpec` instead ? `partitionColumns` does not contain the information of the insertion partition spec. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`
ulysses-you commented on code in PR #39277: URL: https://github.com/apache/spark/pull/39277#discussion_r1059246404 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala: ## @@ -73,16 +74,18 @@ case class InsertIntoHiveTable( query: LogicalPlan, overwrite: Boolean, ifPartitionNotExists: Boolean, -outputColumnNames: Seq[String] +outputColumnNames: Seq[String], +partitionColumns: Seq[Attribute], +bucketSpec: Option[BucketSpec], +options: Map[String, String], +fileFormat: FileFormat, +externalTmpPath: String, +@transient stagingDir: Path, Review Comment: for old hive version, externalTmpPath and stagingDir are the same. https://github.com/apache/spark/blob/a3c837ae2eaf2c7ba08563b7afa0f96df8a4e80b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala#L129-L136 for new hive version: https://github.com/apache/spark/blob/a3c837ae2eaf2c7ba08563b7afa0f96df8a4e80b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala#L189-L197 - externalTmpPath: `new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-1")` - stagingDir: `getExternalScratchDir(extURI, hadoopConf, stagingDir)` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`
ulysses-you commented on code in PR #39277: URL: https://github.com/apache/spark/pull/39277#discussion_r1059245083 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala: ## @@ -105,4 +112,164 @@ trait V1WritesHiveUtils { .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> "true")) .getOrElse(Map.empty) } + + def setupCompression( + fileSinkConf: FileSinkDesc, + hadoopConf: Configuration, + sparkSession: SparkSession): Unit = { +val isCompressed = + fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) match { +case formatName if formatName.endsWith("orcoutputformat") => + // For ORC,"mapreduce.output.fileoutputformat.compress", + // "mapreduce.output.fileoutputformat.compress.codec", and + // "mapreduce.output.fileoutputformat.compress.type" + // have no impact because it uses table properties to store compression information. + false +case _ => hadoopConf.get("hive.exec.compress.output", "false").toBoolean + } + +if (isCompressed) { + hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true") + fileSinkConf.setCompressed(true) + fileSinkConf.setCompressCodec(hadoopConf +.get("mapreduce.output.fileoutputformat.compress.codec")) + fileSinkConf.setCompressType(hadoopConf +.get("mapreduce.output.fileoutputformat.compress.type")) +} else { + // Set compression by priority + HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf) +.foreach { case (compression, codec) => hadoopConf.set(compression, codec) } +} + } + + /** + * Return two paths: + * 1. The first path is `stagingDir` which can be the parent path of `externalTmpPath` + * 2. The second path is `externalTmpPath`, e.g. `$stagingDir/-ext-1` Review Comment: it not only can be the parent. for old version hive, they are the same. So if we want reduce one path, we should check the hive version again before using it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`
ulysses-you commented on code in PR #39277: URL: https://github.com/apache/spark/pull/39277#discussion_r1059244731 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala: ## @@ -157,117 +84,18 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveU } } - // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13 - private def oldVersionExternalTempPath( Review Comment: these code are moved to `V1WritesHiveUtils`, so object InsertIntoHiveTable can use them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`
ulysses-you commented on code in PR #39277: URL: https://github.com/apache/spark/pull/39277#discussion_r1059221668 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala: ## @@ -157,117 +84,18 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveU } } - // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13 - private def oldVersionExternalTempPath( - path: Path, - hadoopConf: Configuration, - scratchDir: String): Path = { -val extURI: URI = path.toUri -val scratchPath = new Path(scratchDir, executionId) -var dirPath = new Path( - extURI.getScheme, - extURI.getAuthority, - scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID()) - -try { - val fs: FileSystem = dirPath.getFileSystem(hadoopConf) - dirPath = new Path(fs.makeQualified(dirPath).toString()) - - if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) { -throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString) - } - createdTempDir = Some(dirPath) - fs.deleteOnExit(dirPath) -} catch { - case e: IOException => -throw QueryExecutionErrors.cannotCreateStagingDirError(dirPath.toString, e) -} -dirPath - } - - // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 - private def newVersionExternalTempPath( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { -val extURI: URI = path.toUri -if (extURI.getScheme == "viewfs") { - getExtTmpPathRelTo(path, hadoopConf, stagingDir) -} else { - new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-1") -} - } - - private def getExtTmpPathRelTo( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { -new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-1") // Hive uses 1 - } - - private def getExternalScratchDir( - extURI: URI, - hadoopConf: Configuration, - stagingDir: String): Path = { -getStagingDir( - new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), - hadoopConf, - stagingDir) - } - - private[hive] def getStagingDir( - inputPath: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { -val inputPathName: String = inputPath.toString -val fs: FileSystem = inputPath.getFileSystem(hadoopConf) -var stagingPathName: String = - if (inputPathName.indexOf(stagingDir) == -1) { -new Path(inputPathName, stagingDir).toString - } else { -inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) - } - -// SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the -// staging directory needs to avoid being deleted when users set hive.exec.stagingdir -// under the table directory. -if (isSubDir(new Path(stagingPathName), inputPath, fs) && - !stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) { - logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + -"with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + -"directory.") - stagingPathName = new Path(inputPathName, ".hive-staging").toString -} - -val dir: Path = - fs.makeQualified( -new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) -logDebug("Created staging dir = " + dir + " for path = " + inputPath) + protected def createExternalTmpPath(dir: Path, hadoopConf: Configuration): Unit = { +val fs: FileSystem = dir.getFileSystem(hadoopConf) try { if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") } - createdTempDir = Some(dir) Review Comment: The global variable `createdTempDir` is really hack. Since we have specified staging dir, we can pass it to the method `deleteExternalTmpPath`, then we do not need it anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`
ulysses-you commented on code in PR #39277: URL: https://github.com/apache/spark/pull/39277#discussion_r1059221992 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala: ## @@ -92,29 +95,13 @@ case class InsertIntoHiveTable( */ override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = { val externalCatalog = sparkSession.sharedState.externalCatalog -val hadoopConf = sparkSession.sessionState.newHadoopConf() - -val hiveQlTable = HiveClientImpl.toHiveTable(table) -// Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer -// instances within the closure, since Serializer is not serializable while TableDesc is. -val tableDesc = new TableDesc( - hiveQlTable.getInputFormatClass, - // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because - // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to - // substitute some output formats, e.g. substituting SequenceFileOutputFormat to - // HiveSequenceFileOutputFormat. - hiveQlTable.getOutputFormatClass, - hiveQlTable.getMetadata -) -val tableLocation = hiveQlTable.getDataLocation -val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, tableLocation) - +createExternalTmpPath(stagingDir, hadoopConf) try { - processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, tmpLocation, child) + processInsert(sparkSession, externalCatalog, child) Review Comment: now the code looks like: ```scala create stagingDir try { processInsert } finally { delete stagingDir } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`
ulysses-you commented on code in PR #39277: URL: https://github.com/apache/spark/pull/39277#discussion_r1059221668 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala: ## @@ -157,117 +84,18 @@ private[hive] trait SaveAsHiveFile extends DataWritingCommand with V1WritesHiveU } } - // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13 - private def oldVersionExternalTempPath( - path: Path, - hadoopConf: Configuration, - scratchDir: String): Path = { -val extURI: URI = path.toUri -val scratchPath = new Path(scratchDir, executionId) -var dirPath = new Path( - extURI.getScheme, - extURI.getAuthority, - scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID()) - -try { - val fs: FileSystem = dirPath.getFileSystem(hadoopConf) - dirPath = new Path(fs.makeQualified(dirPath).toString()) - - if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) { -throw new IllegalStateException("Cannot create staging directory: " + dirPath.toString) - } - createdTempDir = Some(dirPath) - fs.deleteOnExit(dirPath) -} catch { - case e: IOException => -throw QueryExecutionErrors.cannotCreateStagingDirError(dirPath.toString, e) -} -dirPath - } - - // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2 - private def newVersionExternalTempPath( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { -val extURI: URI = path.toUri -if (extURI.getScheme == "viewfs") { - getExtTmpPathRelTo(path, hadoopConf, stagingDir) -} else { - new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-1") -} - } - - private def getExtTmpPathRelTo( - path: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { -new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-1") // Hive uses 1 - } - - private def getExternalScratchDir( - extURI: URI, - hadoopConf: Configuration, - stagingDir: String): Path = { -getStagingDir( - new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath), - hadoopConf, - stagingDir) - } - - private[hive] def getStagingDir( - inputPath: Path, - hadoopConf: Configuration, - stagingDir: String): Path = { -val inputPathName: String = inputPath.toString -val fs: FileSystem = inputPath.getFileSystem(hadoopConf) -var stagingPathName: String = - if (inputPathName.indexOf(stagingDir) == -1) { -new Path(inputPathName, stagingDir).toString - } else { -inputPathName.substring(0, inputPathName.indexOf(stagingDir) + stagingDir.length) - } - -// SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive requires that the -// staging directory needs to avoid being deleted when users set hive.exec.stagingdir -// under the table directory. -if (isSubDir(new Path(stagingPathName), inputPath, fs) && - !stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) { - logDebug(s"The staging dir '$stagingPathName' should be a child directory starts " + -"with '.' to avoid being deleted if we set hive.exec.stagingdir under the table " + -"directory.") - stagingPathName = new Path(inputPathName, ".hive-staging").toString -} - -val dir: Path = - fs.makeQualified( -new Path(stagingPathName + "_" + executionId + "-" + TaskRunner.getTaskRunnerID)) -logDebug("Created staging dir = " + dir + " for path = " + inputPath) + protected def createExternalTmpPath(dir: Path, hadoopConf: Configuration): Unit = { +val fs: FileSystem = dir.getFileSystem(hadoopConf) try { if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) { throw new IllegalStateException("Cannot create staging directory '" + dir.toString + "'") } - createdTempDir = Some(dir) Review Comment: The global variable `createdTempDir` is really hack. Since we have specified staging dir, we can pass it to the method `deleteExternalTmpPath`, then we do not it anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`
ulysses-you commented on code in PR #39277: URL: https://github.com/apache/spark/pull/39277#discussion_r1059221271 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala: ## @@ -105,4 +112,164 @@ trait V1WritesHiveUtils { .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> "true")) .getOrElse(Map.empty) } + + def setupCompression( + fileSinkConf: FileSinkDesc, + hadoopConf: Configuration, + sparkSession: SparkSession): Unit = { +val isCompressed = + fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) match { +case formatName if formatName.endsWith("orcoutputformat") => + // For ORC,"mapreduce.output.fileoutputformat.compress", + // "mapreduce.output.fileoutputformat.compress.codec", and + // "mapreduce.output.fileoutputformat.compress.type" + // have no impact because it uses table properties to store compression information. + false +case _ => hadoopConf.get("hive.exec.compress.output", "false").toBoolean + } + +if (isCompressed) { + hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true") + fileSinkConf.setCompressed(true) + fileSinkConf.setCompressCodec(hadoopConf +.get("mapreduce.output.fileoutputformat.compress.codec")) + fileSinkConf.setCompressType(hadoopConf +.get("mapreduce.output.fileoutputformat.compress.type")) +} else { + // Set compression by priority + HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, sparkSession.sessionState.conf) +.foreach { case (compression, codec) => hadoopConf.set(compression, codec) } +} + } + + /** + * Return two paths: + * 1. The first path is `stagingDir` which can be the parent path of `externalTmpPath` + * 2. The second path is `externalTmpPath`, e.g. `$stagingDir/-ext-1` + * The call side should create `stagingDir` before using `externalTmpPath` and + * delete `stagingDir` at the end. + */ + protected def getExternalTmpPath( Review Comment: This is the key change for hive insertion. Before this method has a side effect of creating the stagingDir. Now, this method return two paths, one is staging dir for creating and the other is the original externalTmpPath. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org