Github user merlintang commented on a diff in the pull request:
https://github.com/apache/spark/pull/15819#discussion_r88778830
--- Diff:
sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
---
@@ -54,6 +61,61 @@ case class InsertIntoHiveTable(
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@transient private lazy val catalog = sc.catalog
+ val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR)
+
+ private def executionId: String = {
+ val rand: Random = new Random
+ val format: SimpleDateFormat = new
SimpleDateFormat("yyyy-MM-dd_HH-mm-ss_SSS")
+ val executionId: String = "hive_" + format.format(new Date) + "_" +
Math.abs(rand.nextLong)
+ return executionId
+ }
+
+ private def getStagingDir(inputPath: Path, hadoopConf: Configuration):
Path = {
+ val inputPathUri: URI = inputPath.toUri
+ val inputPathName: String = inputPathUri.getPath
+ val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
+ val stagingPathName: String =
+ if (inputPathName.indexOf(stagingDir) == -1) {
+ new Path(inputPathName, stagingDir).toString
+ } else {
+ inputPathName.substring(0, inputPathName.indexOf(stagingDir) +
stagingDir.length)
+ }
+ val dir: Path =
+ fs.makeQualified(
+ new Path(stagingPathName + "_" + executionId + "-" +
TaskRunner.getTaskRunnerID))
+ logDebug("Created staging dir = " + dir + " for path = " + inputPath)
+ try {
+ if (!FileUtils.mkdir(fs, dir, true, hadoopConf)) {
+ throw new IllegalStateException("Cannot create staging directory
'" + dir.toString + "'")
+ }
+ fs.deleteOnExit(dir)
+ }
+ catch {
+ case e: IOException =>
+ throw new RuntimeException(
--- End diff --
You can find the reason that we use this code is because (1) the old
version need to use the hive package to create the staging directory, in the
hive code, this staging directory is storied in a hash map, and then these
staging directories would be removed when the session is closed. however, our
spark code do not trigger the hive session close, then, these directories will
not be removed. (2) you can find the pushed code just simulate the hive way to
create the staging directory inside the spark rather than based on the hive.
Then, the staging directory will be removed. (3) I will fix the return type
issue, thanks for your comments @srowen
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]