Github user liancheng commented on a diff in the pull request:
https://github.com/apache/spark/pull/2765#discussion_r19327366
--- Diff:
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
---
@@ -118,6 +122,41 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <:
NewInputFormat[K,V] : Clas
(newFiles, filter.minNewFileModTime)
}
+ def getPathList(path: Path, fs: FileSystem): List[Path] = {
+ var pathList = List[Path]()
+ pathList = path :: pathList
+ var tmp = List[Path]()
+ tmp = path :: tmp
+ for (i <- 0 until depth) {
+ tmp = getSubPathList(tmp, fs)
+ pathList = tmp ::: pathList
+ }
+ pathList.filter {
+ path =>
+ val modTime = fs.getFileStatus(path).getModificationTime
+ logDebug(s"Mod time for $path is $modTime")
+ if (modTime > ignoreTime) {
+ logDebug(s"Mod time $modTime more than ignore time $ignoreTime")
+ true
+ } else {
+ false
+ }
+ }
+ }
+
+ def getSubPathList(path: List[Path], fs: FileSystem): List[Path] = {
+ val filter = new SubPathFilter()
+ var pathList = List[Path]()
+ path.map {
+ subPath =>
+ fs.listStatus(subPath, filter).map {
+ x =>
+ pathList = x.getPath() :: pathList
+ }
+ }
+ pathList
+ }
--- End diff --
I'd suggest to refactor `getPathList` and `getSubPathList` into the
following version, which also addresses the performance issue [mentioned
above](https://github.com/apache/spark/pull/2765/files#r19327246) by replacing
individual `getFileStatus` calls with batch style `listStatus` calls:
```scala
def getPathList(path: Path, fs: FileSystem): List[Path] = {
def dfs(status: FileStatus, currentDepth: Int): List[FileStatus] =
status match {
case _ if currentDepth < 0 => Nil
case _ if !status.isDirectory => status :: Nil
case _ => fs.listStatus(status.getPath).toList.flatMap(dfs(_,
currentDepth - 1))
}
dfs(fs.getFileStatus(path), depth).filter { status =>
val isNewlyModified = status.getModificationTime >= ignoreTime
logDebug("Newly modified file detected, modification time is " +
status.getModificationTime)
isNewlyModified
}.map(_.getPath)
}
```
Also, the `fs.getFileStatus(path).isDirectory()` call can be removed with
this version.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]