[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

2023-01-08 Thread GitBox


ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1064238626


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:
##
@@ -294,3 +285,40 @@ case class InsertIntoHiveTable(
   override protected def withNewChildInternal(newChild: LogicalPlan): 
InsertIntoHiveTable =
 copy(query = newChild)
 }
+
+object InsertIntoHiveTable extends V1WritesHiveUtils with Logging {

Review Comment:
   oh, I missed to clean up it. Will remove it when I touch the related code



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

2023-01-05 Thread GitBox


ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1062552806


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:
##
@@ -92,29 +98,17 @@ case class InsertIntoHiveTable(
*/
   override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
 val externalCatalog = sparkSession.sharedState.externalCatalog
-val hadoopConf = sparkSession.sessionState.newHadoopConf()
-
-val hiveQlTable = HiveClientImpl.toHiveTable(table)
-// Have to pass the TableDesc object to RDD.mapPartitions and then 
instantiate new serializer
-// instances within the closure, since Serializer is not serializable 
while TableDesc is.
-val tableDesc = new TableDesc(
-  hiveQlTable.getInputFormatClass,
-  // The class of table should be org.apache.hadoop.hive.ql.metadata.Table 
because
-  // getOutputFormatClass will use 
HiveFileFormatUtils.getOutputFormatSubstitute to
-  // substitute some output formats, e.g. substituting 
SequenceFileOutputFormat to
-  // HiveSequenceFileOutputFormat.
-  hiveQlTable.getOutputFormatClass,
-  hiveQlTable.getMetadata
-)
-val tableLocation = hiveQlTable.getDataLocation
-val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, 
tableLocation)
+val hadoopConf = externalTmpPath.hadoopConf
+val stagingDir = externalTmpPath.stagingDir
+val tmpLocation = externalTmpPath.externalTempPath
 
+createExternalTmpPath(stagingDir, hadoopConf)
 try {
-  processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, 
tmpLocation, child)
+  processInsert(sparkSession, externalCatalog, hadoopConf, tmpLocation, 
child)
 } finally {
   // Attempt to delete the staging directory and the inclusive files. If 
failed, the files are
   // expected to be dropped at the normal termination of VM since 
deleteOnExit is used.
-  deleteExternalTmpPath(hadoopConf)
+  deleteExternalTmpPath(stagingDir, hadoopConf)

Review Comment:
   addressed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

2023-01-05 Thread GitBox


ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1062551972


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTempPath.scala:
##
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import java.io.IOException
+import java.net.URI
+import java.text.SimpleDateFormat
+import java.util.{Date, Locale, Random}
+
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.hive.common.FileUtils
+import org.apache.hadoop.hive.ql.exec.TaskRunner
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.hive.HiveExternalCatalog
+import org.apache.spark.sql.hive.client.HiveVersion
+
+class HiveTempPath(session: SparkSession, val hadoopConf: Configuration, path: 
Path)
+  extends Logging {
+  private var stagingDirForCreating: Option[Path] = None
+
+  lazy val externalTempPath: Path = getExternalTmpPath(path)
+
+  private def getExternalTmpPath(path: Path): Path = {
+import org.apache.spark.sql.hive.client.hive._
+
+// Before Hive 1.1, when inserting into a table, Hive will create the 
staging directory under
+// a common scratch directory. After the writing is finished, Hive will 
simply empty the table
+// directory and move the staging directory to it.
+// After Hive 1.1, Hive will create the staging directory under the table 
directory, and when
+// moving staging directory to table directory, Hive will still empty the 
table directory, but
+// will exclude the staging directory there.
+// We have to follow the Hive behavior here, to avoid troubles. For 
example, if we create
+// staging directory under the table director for Hive prior to 1.1, the 
staging directory will
+// be removed by Hive when Hive is trying to empty the table directory.
+val hiveVersionsUsingOldExternalTempPath: Set[HiveVersion] = Set(v12, v13, 
v14, v1_0)
+val hiveVersionsUsingNewExternalTempPath: Set[HiveVersion] =
+  Set(v1_1, v1_2, v2_0, v2_1, v2_2, v2_3, v3_0, v3_1)
+
+// Ensure all the supported versions are considered here.
+assert(hiveVersionsUsingNewExternalTempPath ++ 
hiveVersionsUsingOldExternalTempPath ==
+  allSupportedHiveVersions)
+
+val externalCatalog = session.sharedState.externalCatalog
+val hiveVersion = 
externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client.version
+val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
+val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
+
+if (hiveVersionsUsingOldExternalTempPath.contains(hiveVersion)) {
+  oldVersionExternalTempPath(path, scratchDir)
+} else if (hiveVersionsUsingNewExternalTempPath.contains(hiveVersion)) {
+  newVersionExternalTempPath(path, stagingDir)
+} else {
+  throw new IllegalStateException("Unsupported hive version: " + 
hiveVersion.fullVersion)
+}
+  }
+
+  // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
+  private def oldVersionExternalTempPath(path: Path, scratchDir: String): Path 
= {
+val extURI: URI = path.toUri
+val scratchPath = new Path(scratchDir, executionId)
+var dirPath = new Path(
+  extURI.getScheme,
+  extURI.getAuthority,
+  scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
+
+val fs = dirPath.getFileSystem(hadoopConf)
+dirPath = new Path(fs.makeQualified(dirPath).toString())
+stagingDirForCreating = Some(dirPath)
+dirPath
+  }
+
+  // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
+  private def newVersionExternalTempPath(path: Path, stagingDir: String): Path 
= {
+val extURI: URI = path.toUri
+if (extURI.getScheme == "viewfs") {
+  val qualifiedStagingDir = getStagingDir(path, stagingDir)
+  stagingDirForCreating = Some(qualifiedStagingDir)
+  // Hive uses 1
+  new Path(qualifiedStagingDir, "-ext-1")
+} else {
+  val qualifiedStagingDir = 

[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

2023-01-04 Thread GitBox


ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1062052748


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala:
##
@@ -105,4 +112,164 @@ trait V1WritesHiveUtils {
   .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> 
"true"))
   .getOrElse(Map.empty)
   }
+
+  def setupCompression(
+  fileSinkConf: FileSinkDesc,
+  hadoopConf: Configuration,
+  sparkSession: SparkSession): Unit = {
+val isCompressed =
+  
fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) 
match {
+case formatName if formatName.endsWith("orcoutputformat") =>
+  // For ORC,"mapreduce.output.fileoutputformat.compress",
+  // "mapreduce.output.fileoutputformat.compress.codec", and
+  // "mapreduce.output.fileoutputformat.compress.type"
+  // have no impact because it uses table properties to store 
compression information.
+  false
+case _ => hadoopConf.get("hive.exec.compress.output", 
"false").toBoolean
+  }
+
+if (isCompressed) {
+  hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
+  fileSinkConf.setCompressed(true)
+  fileSinkConf.setCompressCodec(hadoopConf
+.get("mapreduce.output.fileoutputformat.compress.codec"))
+  fileSinkConf.setCompressType(hadoopConf
+.get("mapreduce.output.fileoutputformat.compress.type"))
+} else {
+  // Set compression by priority
+  HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, 
sparkSession.sessionState.conf)
+.foreach { case (compression, codec) => hadoopConf.set(compression, 
codec) }
+}
+  }
+
+  /**
+   * Return two paths:
+   * 1. The first path is `stagingDir` which can be the parent path of 
`externalTmpPath`
+   * 2. The second path is `externalTmpPath`, e.g. `$stagingDir/-ext-1`
+   * The call side should create `stagingDir` before using `externalTmpPath` 
and
+   * delete `stagingDir` at the end.

Review Comment:
   wrapped using `HiveTempPath` since it would be used by 
`InsertIntoHiveDirCommand`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

2023-01-04 Thread GitBox


ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1062020277


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala:
##
@@ -53,13 +59,17 @@ case class WriteFiles(child: LogicalPlan) extends UnaryNode 
{
 /**
  * Responsible for writing files.
  */
-case class WriteFilesExec(child: SparkPlan) extends UnaryExecNode {
+case class WriteFilesExec(
+child: SparkPlan,
+fileFormat: FileFormat,
+partitionColumns: Seq[Attribute],
+bucketSpec: Option[BucketSpec],
+options: Map[String, String],
+staticPartitions: TablePartitionSpec) extends UnaryExecNode {
   override def output: Seq[Attribute] = Seq.empty
 
-  override protected def doExecuteWrite(writeSpec: WriteSpec): 
RDD[WriterCommitMessage] = {
-assert(writeSpec.isInstanceOf[WriteFilesSpec])
-val writeFilesSpec: WriteFilesSpec = writeSpec.asInstanceOf[WriteFilesSpec]
-
+  override protected def doExecuteWrite(
+  writeFilesSpec: WriteFilesSpec): RDD[WriterCommitMessage] = {

Review Comment:
   Seems it's a bit hard. look at the current information:
   ```scala
   case class WriteFilesSpec(
   description: WriteJobDescription,
   committer: FileCommitProtocol,
   concurrentOutputWriterSpecFunc: SparkPlan => 
Option[ConcurrentOutputWriterSpec])
   ```
   - `ConcurrentOutputWriterSpec` and `FileCommitProtocol` contain the output 
spec so we can not replace them
   - `WriteJobDescription` contains many information which includes what we 
pull out, but if we want to reduce something inside `WriteJobDescription`, we 
need to create a new class to hold others. I'm not sure it's worth to do that.
   
   ```scala
   class WriteJobDescription(
   val uuid: String,
   val serializableHadoopConf: SerializableConfiguration,
   val outputWriterFactory: OutputWriterFactory,
   val allColumns: Seq[Attribute],
   val dataColumns: Seq[Attribute],
   val partitionColumns: Seq[Attribute],
   val bucketSpec: Option[WriterBucketSpec],
   val path: String,
   val customPartitionLocations: Map[TablePartitionSpec, String],
   val maxRecordsPerFile: Long,
   val timeZoneId: String,
   val statsTrackers: Seq[WriteJobStatsTracker])
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

2022-12-29 Thread GitBox


ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059246929


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteFiles.scala:
##
@@ -38,13 +38,18 @@ case class WriteFilesSpec(
 description: WriteJobDescription,
 committer: FileCommitProtocol,
 concurrentOutputWriterSpecFunc: SparkPlan => 
Option[ConcurrentOutputWriterSpec])
-  extends WriteSpec
 
 /**
  * During Optimizer, [[V1Writes]] injects the [[WriteFiles]] between 
[[V1WriteCommand]] and query.
  * [[WriteFiles]] must be the root plan as the child of [[V1WriteCommand]].
  */
-case class WriteFiles(child: LogicalPlan) extends UnaryNode {
+case class WriteFiles(
+child: LogicalPlan,
+fileFormat: FileFormat,
+partitionColumns: Seq[Attribute],
+bucketSpec: Option[BucketSpec],
+options: Map[String, String],
+requiredOrdering: Seq[SortOrder]) extends UnaryNode {

Review Comment:
   how about pull out `partitionSpec` instead ? `partitionColumns` does not 
contain the information of the insertion partition spec.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

2022-12-29 Thread GitBox


ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059246404


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:
##
@@ -73,16 +74,18 @@ case class InsertIntoHiveTable(
 query: LogicalPlan,
 overwrite: Boolean,
 ifPartitionNotExists: Boolean,
-outputColumnNames: Seq[String]
+outputColumnNames: Seq[String],
+partitionColumns: Seq[Attribute],
+bucketSpec: Option[BucketSpec],
+options: Map[String, String],
+fileFormat: FileFormat,
+externalTmpPath: String,
+@transient stagingDir: Path,

Review Comment:
   for old hive version, externalTmpPath and stagingDir are the same.
   
https://github.com/apache/spark/blob/a3c837ae2eaf2c7ba08563b7afa0f96df8a4e80b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala#L129-L136
   
   
   for new hive version:
   
https://github.com/apache/spark/blob/a3c837ae2eaf2c7ba08563b7afa0f96df8a4e80b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala#L189-L197
   
   - externalTmpPath: `new Path(getExternalScratchDir(extURI, hadoopConf, 
stagingDir), "-ext-1")`
   - stagingDir: `getExternalScratchDir(extURI, hadoopConf, stagingDir)`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

2022-12-29 Thread GitBox


ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059245083


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala:
##
@@ -105,4 +112,164 @@ trait V1WritesHiveUtils {
   .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> 
"true"))
   .getOrElse(Map.empty)
   }
+
+  def setupCompression(
+  fileSinkConf: FileSinkDesc,
+  hadoopConf: Configuration,
+  sparkSession: SparkSession): Unit = {
+val isCompressed =
+  
fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) 
match {
+case formatName if formatName.endsWith("orcoutputformat") =>
+  // For ORC,"mapreduce.output.fileoutputformat.compress",
+  // "mapreduce.output.fileoutputformat.compress.codec", and
+  // "mapreduce.output.fileoutputformat.compress.type"
+  // have no impact because it uses table properties to store 
compression information.
+  false
+case _ => hadoopConf.get("hive.exec.compress.output", 
"false").toBoolean
+  }
+
+if (isCompressed) {
+  hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
+  fileSinkConf.setCompressed(true)
+  fileSinkConf.setCompressCodec(hadoopConf
+.get("mapreduce.output.fileoutputformat.compress.codec"))
+  fileSinkConf.setCompressType(hadoopConf
+.get("mapreduce.output.fileoutputformat.compress.type"))
+} else {
+  // Set compression by priority
+  HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, 
sparkSession.sessionState.conf)
+.foreach { case (compression, codec) => hadoopConf.set(compression, 
codec) }
+}
+  }
+
+  /**
+   * Return two paths:
+   * 1. The first path is `stagingDir` which can be the parent path of 
`externalTmpPath`
+   * 2. The second path is `externalTmpPath`, e.g. `$stagingDir/-ext-1`

Review Comment:
   it not only can be the parent. for old version hive, they are the same. So 
if we want reduce one path, we should check the hive version again before using 
it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

2022-12-29 Thread GitBox


ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059244731


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala:
##
@@ -157,117 +84,18 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand with V1WritesHiveU
 }
   }
 
-  // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
-  private def oldVersionExternalTempPath(

Review Comment:
   these code are moved to `V1WritesHiveUtils`, so object InsertIntoHiveTable 
can use them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

2022-12-29 Thread GitBox


ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059221668


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala:
##
@@ -157,117 +84,18 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand with V1WritesHiveU
 }
   }
 
-  // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
-  private def oldVersionExternalTempPath(
-  path: Path,
-  hadoopConf: Configuration,
-  scratchDir: String): Path = {
-val extURI: URI = path.toUri
-val scratchPath = new Path(scratchDir, executionId)
-var dirPath = new Path(
-  extURI.getScheme,
-  extURI.getAuthority,
-  scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
-
-try {
-  val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
-  dirPath = new Path(fs.makeQualified(dirPath).toString())
-
-  if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
-throw new IllegalStateException("Cannot create staging directory: " + 
dirPath.toString)
-  }
-  createdTempDir = Some(dirPath)
-  fs.deleteOnExit(dirPath)
-} catch {
-  case e: IOException =>
-throw 
QueryExecutionErrors.cannotCreateStagingDirError(dirPath.toString, e)
-}
-dirPath
-  }
-
-  // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
-  private def newVersionExternalTempPath(
-  path: Path,
-  hadoopConf: Configuration,
-  stagingDir: String): Path = {
-val extURI: URI = path.toUri
-if (extURI.getScheme == "viewfs") {
-  getExtTmpPathRelTo(path, hadoopConf, stagingDir)
-} else {
-  new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), 
"-ext-1")
-}
-  }
-
-  private def getExtTmpPathRelTo(
-  path: Path,
-  hadoopConf: Configuration,
-  stagingDir: String): Path = {
-new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-1") // 
Hive uses 1
-  }
-
-  private def getExternalScratchDir(
-  extURI: URI,
-  hadoopConf: Configuration,
-  stagingDir: String): Path = {
-getStagingDir(
-  new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
-  hadoopConf,
-  stagingDir)
-  }
-
-  private[hive] def getStagingDir(
-  inputPath: Path,
-  hadoopConf: Configuration,
-  stagingDir: String): Path = {
-val inputPathName: String = inputPath.toString
-val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
-var stagingPathName: String =
-  if (inputPathName.indexOf(stagingDir) == -1) {
-new Path(inputPathName, stagingDir).toString
-  } else {
-inputPathName.substring(0, inputPathName.indexOf(stagingDir) + 
stagingDir.length)
-  }
-
-// SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive 
requires that the
-// staging directory needs to avoid being deleted when users set 
hive.exec.stagingdir
-// under the table directory.
-if (isSubDir(new Path(stagingPathName), inputPath, fs) &&
-  
!stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) {
-  logDebug(s"The staging dir '$stagingPathName' should be a child 
directory starts " +
-"with '.' to avoid being deleted if we set hive.exec.stagingdir under 
the table " +
-"directory.")
-  stagingPathName = new Path(inputPathName, ".hive-staging").toString
-}
-
-val dir: Path =
-  fs.makeQualified(
-new Path(stagingPathName + "_" + executionId + "-" + 
TaskRunner.getTaskRunnerID))
-logDebug("Created staging dir = " + dir + " for path = " + inputPath)
+  protected def createExternalTmpPath(dir: Path, hadoopConf: Configuration): 
Unit = {
+val fs: FileSystem = dir.getFileSystem(hadoopConf)
 try {
   if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
 throw new IllegalStateException("Cannot create staging directory  '" + 
dir.toString + "'")
   }
-  createdTempDir = Some(dir)

Review Comment:
   The global variable `createdTempDir` is  really hack. Since we have 
specified staging dir, we can pass it to the method `deleteExternalTmpPath`, 
then we do not need it anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

2022-12-29 Thread GitBox


ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059221992


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala:
##
@@ -92,29 +95,13 @@ case class InsertIntoHiveTable(
*/
   override def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row] = {
 val externalCatalog = sparkSession.sharedState.externalCatalog
-val hadoopConf = sparkSession.sessionState.newHadoopConf()
-
-val hiveQlTable = HiveClientImpl.toHiveTable(table)
-// Have to pass the TableDesc object to RDD.mapPartitions and then 
instantiate new serializer
-// instances within the closure, since Serializer is not serializable 
while TableDesc is.
-val tableDesc = new TableDesc(
-  hiveQlTable.getInputFormatClass,
-  // The class of table should be org.apache.hadoop.hive.ql.metadata.Table 
because
-  // getOutputFormatClass will use 
HiveFileFormatUtils.getOutputFormatSubstitute to
-  // substitute some output formats, e.g. substituting 
SequenceFileOutputFormat to
-  // HiveSequenceFileOutputFormat.
-  hiveQlTable.getOutputFormatClass,
-  hiveQlTable.getMetadata
-)
-val tableLocation = hiveQlTable.getDataLocation
-val tmpLocation = getExternalTmpPath(sparkSession, hadoopConf, 
tableLocation)
-
+createExternalTmpPath(stagingDir, hadoopConf)
 try {
-  processInsert(sparkSession, externalCatalog, hadoopConf, tableDesc, 
tmpLocation, child)
+  processInsert(sparkSession, externalCatalog, child)

Review Comment:
   now the code looks like:
   
   ```scala
   create stagingDir
   try {
 processInsert
   } finally {
 delete stagingDir
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

2022-12-29 Thread GitBox


ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059221668


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/SaveAsHiveFile.scala:
##
@@ -157,117 +84,18 @@ private[hive] trait SaveAsHiveFile extends 
DataWritingCommand with V1WritesHiveU
 }
   }
 
-  // Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
-  private def oldVersionExternalTempPath(
-  path: Path,
-  hadoopConf: Configuration,
-  scratchDir: String): Path = {
-val extURI: URI = path.toUri
-val scratchPath = new Path(scratchDir, executionId)
-var dirPath = new Path(
-  extURI.getScheme,
-  extURI.getAuthority,
-  scratchPath.toUri.getPath + "-" + TaskRunner.getTaskRunnerID())
-
-try {
-  val fs: FileSystem = dirPath.getFileSystem(hadoopConf)
-  dirPath = new Path(fs.makeQualified(dirPath).toString())
-
-  if (!FileUtils.mkdir(fs, dirPath, true, hadoopConf)) {
-throw new IllegalStateException("Cannot create staging directory: " + 
dirPath.toString)
-  }
-  createdTempDir = Some(dirPath)
-  fs.deleteOnExit(dirPath)
-} catch {
-  case e: IOException =>
-throw 
QueryExecutionErrors.cannotCreateStagingDirError(dirPath.toString, e)
-}
-dirPath
-  }
-
-  // Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
-  private def newVersionExternalTempPath(
-  path: Path,
-  hadoopConf: Configuration,
-  stagingDir: String): Path = {
-val extURI: URI = path.toUri
-if (extURI.getScheme == "viewfs") {
-  getExtTmpPathRelTo(path, hadoopConf, stagingDir)
-} else {
-  new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), 
"-ext-1")
-}
-  }
-
-  private def getExtTmpPathRelTo(
-  path: Path,
-  hadoopConf: Configuration,
-  stagingDir: String): Path = {
-new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-1") // 
Hive uses 1
-  }
-
-  private def getExternalScratchDir(
-  extURI: URI,
-  hadoopConf: Configuration,
-  stagingDir: String): Path = {
-getStagingDir(
-  new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
-  hadoopConf,
-  stagingDir)
-  }
-
-  private[hive] def getStagingDir(
-  inputPath: Path,
-  hadoopConf: Configuration,
-  stagingDir: String): Path = {
-val inputPathName: String = inputPath.toString
-val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
-var stagingPathName: String =
-  if (inputPathName.indexOf(stagingDir) == -1) {
-new Path(inputPathName, stagingDir).toString
-  } else {
-inputPathName.substring(0, inputPathName.indexOf(stagingDir) + 
stagingDir.length)
-  }
-
-// SPARK-20594: This is a walk-around fix to resolve a Hive bug. Hive 
requires that the
-// staging directory needs to avoid being deleted when users set 
hive.exec.stagingdir
-// under the table directory.
-if (isSubDir(new Path(stagingPathName), inputPath, fs) &&
-  
!stagingPathName.stripPrefix(inputPathName).stripPrefix("/").startsWith(".")) {
-  logDebug(s"The staging dir '$stagingPathName' should be a child 
directory starts " +
-"with '.' to avoid being deleted if we set hive.exec.stagingdir under 
the table " +
-"directory.")
-  stagingPathName = new Path(inputPathName, ".hive-staging").toString
-}
-
-val dir: Path =
-  fs.makeQualified(
-new Path(stagingPathName + "_" + executionId + "-" + 
TaskRunner.getTaskRunnerID))
-logDebug("Created staging dir = " + dir + " for path = " + inputPath)
+  protected def createExternalTmpPath(dir: Path, hadoopConf: Configuration): 
Unit = {
+val fs: FileSystem = dir.getFileSystem(hadoopConf)
 try {
   if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
 throw new IllegalStateException("Cannot create staging directory  '" + 
dir.toString + "'")
   }
-  createdTempDir = Some(dir)

Review Comment:
   The global variable `createdTempDir` is  really hack. Since we have 
specified staging dir, we can pass it to the method `deleteExternalTmpPath`, 
then we do not it anymore.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] ulysses-you commented on a diff in pull request #39277: [SPARK-41708][SQL] Pull v1write information to `WriteFiles`

2022-12-29 Thread GitBox


ulysses-you commented on code in PR #39277:
URL: https://github.com/apache/spark/pull/39277#discussion_r1059221271


##
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/V1WritesHiveUtils.scala:
##
@@ -105,4 +112,164 @@ trait V1WritesHiveUtils {
   .map(_ => Map(BucketingUtils.optionForHiveCompatibleBucketWrite -> 
"true"))
   .getOrElse(Map.empty)
   }
+
+  def setupCompression(
+  fileSinkConf: FileSinkDesc,
+  hadoopConf: Configuration,
+  sparkSession: SparkSession): Unit = {
+val isCompressed =
+  
fileSinkConf.getTableInfo.getOutputFileFormatClassName.toLowerCase(Locale.ROOT) 
match {
+case formatName if formatName.endsWith("orcoutputformat") =>
+  // For ORC,"mapreduce.output.fileoutputformat.compress",
+  // "mapreduce.output.fileoutputformat.compress.codec", and
+  // "mapreduce.output.fileoutputformat.compress.type"
+  // have no impact because it uses table properties to store 
compression information.
+  false
+case _ => hadoopConf.get("hive.exec.compress.output", 
"false").toBoolean
+  }
+
+if (isCompressed) {
+  hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
+  fileSinkConf.setCompressed(true)
+  fileSinkConf.setCompressCodec(hadoopConf
+.get("mapreduce.output.fileoutputformat.compress.codec"))
+  fileSinkConf.setCompressType(hadoopConf
+.get("mapreduce.output.fileoutputformat.compress.type"))
+} else {
+  // Set compression by priority
+  HiveOptions.getHiveWriteCompression(fileSinkConf.getTableInfo, 
sparkSession.sessionState.conf)
+.foreach { case (compression, codec) => hadoopConf.set(compression, 
codec) }
+}
+  }
+
+  /**
+   * Return two paths:
+   * 1. The first path is `stagingDir` which can be the parent path of 
`externalTmpPath`
+   * 2. The second path is `externalTmpPath`, e.g. `$stagingDir/-ext-1`
+   * The call side should create `stagingDir` before using `externalTmpPath` 
and
+   * delete `stagingDir` at the end.
+   */
+  protected def getExternalTmpPath(

Review Comment:
   This is the key change for hive insertion. Before this method has a side 
effect of creating the stagingDir. Now, this method return two paths, one is 
staging dir for creating and the other is the original externalTmpPath.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org