Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/1616#discussion_r17618358
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -313,15 +313,83 @@ private[spark] object Utils extends Logging {
}
/**
+ * Download a file requested by the executor . Supports fetching the
file in a variety of ways,
+ * including HTTP, HDFS and files on a standard filesystem, based on the
URL parameter.
+ *
+ * If `useCache` is true, first attempts to fetch the file from a local
cache that's shared across
+ * executors running the same application.
+ *
+ * Throws SparkException if the target file already exists and has
different contents than
+ * the requested file.
+ */
+ def fetchFile(
+ url: String,
+ targetDir: File,
+ conf: SparkConf,
+ securityMgr: SecurityManager,
+ hadoopConf: Configuration,
+ timestamp: Long,
+ useCache: Boolean) {
+ val fileName = url.split("/").last
+ val targetFile = new File(targetDir, fileName)
+ if (useCache) {
+ val cachedFileName = s"${url.hashCode}${timestamp}_cache"
+ val lockFileName = s"${url.hashCode}${timestamp}_lock"
+ val localDir = new File(getLocalDir(conf))
+ val lockFile = new File(localDir, lockFileName)
+ val raf = new RandomAccessFile(lockFile, "rw")
+ // Only one executor entry.
+ // The FileLock is only used to control synchronization for
executors download file,
+ // it's always safe regardless of lock type(mandatory or advisory).
+ val lock = raf.getChannel().lock()
+ val cachedFile = new File(localDir, cachedFileName)
+ try {
+ if (!cachedFile.exists()) {
+ doFetchFile(url, localDir, cachedFileName, conf, securityMgr,
hadoopConf)
+ }
+ } finally {
+ lock.release()
+ }
+ if (targetFile.exists && !Files.equal(cachedFile, targetFile)) {
+ if (conf.getBoolean("spark.files.overwrite", false)) {
+ targetFile.delete()
+ logInfo(("File %s exists and does not match contents of %s, " +
+ "replacing it with %s").format(targetFile, url, url))
+ } else {
+ throw new SparkException(s"File $targetFile exists and does not
match contents of $url")
+ }
+ }
+ Files.copy(cachedFile, targetFile)
+ } else {
+ doFetchFile(url, targetDir, fileName, conf, securityMgr, hadoopConf)
+ }
+
+ // Decompress the file if it's a .tar or .tar.gz
+ if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
+ logInfo("Untarring " + fileName)
+ Utils.execute(Seq("tar", "-xzf", fileName), targetDir)
+ } else if (fileName.endsWith(".tar")) {
+ logInfo("Untarring " + fileName)
+ Utils.execute(Seq("tar", "-xf", fileName), targetDir)
+ }
+ // Make the file executable - That's necessary for scripts
+ FileUtil.chmod(targetFile.getAbsolutePath, "a+x")
--- End diff --
I think this is related to the discussion above about using the cache to
avoid the network fetch, but still copying the file from the shared cache into
the executor's work directory. In that case, I think we need to perform the
decompression on the file in the work directory.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]