Github user HyukjinKwon commented on a diff in the pull request:
https://github.com/apache/spark/pull/15131#discussion_r79302542
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1426,41 +1426,53 @@ class SparkContext(config: SparkConf) extends
Logging with ExecutorAllocationCli
* supported for Hadoop-supported filesystems.
*/
def addFile(path: String, recursive: Boolean): Unit = {
- val uri = new URI(path)
- val schemeCorrectedPath = uri.getScheme match {
- case null | "local" => new File(path).getCanonicalFile.toURI.toString
- case _ => path
- }
-
- val hadoopPath = new Path(schemeCorrectedPath)
- val scheme = new URI(schemeCorrectedPath).getScheme
- if (!Array("http", "https", "ftp").contains(scheme)) {
- val fs = hadoopPath.getFileSystem(hadoopConfiguration)
- val isDir = fs.getFileStatus(hadoopPath).isDirectory
- if (!isLocal && scheme == "file" && isDir) {
- throw new SparkException(s"addFile does not support local
directories when not running " +
- "local mode.")
+ if (path == null) {
+ logWarning("null specified as parameter to addFile")
+ } else {
+ var key = ""
+ if (path.contains("\\")) {
+ // For local paths with backslashes on Windows, URI throws an
exception
+ key = env.rpcEnv.fileServer.addFile(new File(path))
+ } else {
+ val uri = new URI(path)
+ val schemeCorrectedPath = uri.getScheme match {
+ case null | "local" => new
File(path).getCanonicalFile.toURI.toString
+ case _ => path
+ }
+
+ val hadoopPath = new Path(schemeCorrectedPath)
+ val scheme = new URI(schemeCorrectedPath).getScheme
+ if (!Array("http", "https", "ftp").contains(scheme)) {
+ val fs = hadoopPath.getFileSystem(hadoopConfiguration)
+ val isDir = fs.getFileStatus(hadoopPath).isDirectory
+ if (!isLocal && scheme == "file" && isDir) {
+ throw new SparkException(s"addFile does not support local
directories when not " +
+ "running local mode.")
+ }
+ if (!recursive && isDir) {
+ throw new SparkException(s"Added file $hadoopPath is a
directory and recursive " +
+ "is not turned on.")
+ }
+ }
+
+ key = if (!isLocal && scheme == "file") {
+ env.rpcEnv.fileServer.addFile(new File(uri.getPath))
+ } else {
+ schemeCorrectedPath
+ }
}
- if (!recursive && isDir) {
- throw new SparkException(s"Added file $hadoopPath is a directory
and recursive is not " +
- "turned on.")
+ if (key != null) {
+ val timestamp = System.currentTimeMillis
+ if (addedFiles.putIfAbsent(key, timestamp).isEmpty) {
+ logInfo(s"Added file $path at $key with timestamp $timestamp")
+ // Fetch the file locally so that closures which are run on the
driver can still use the
+ // SparkFiles API to access files.
+ Utils.fetchFile(path, new File(SparkFiles.getRootDirectory()),
conf, env.securityManager,
--- End diff --
(@yanboliang BTW - I don't mind triggering builds manually. Please feel
free to submit more commits for test purposes if you will)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]