Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2765#discussion_r25502255
  
    --- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
    @@ -173,8 +180,48 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
           val filter = new PathFilter {
             def accept(path: Path): Boolean = isNewFile(path, currentTime, 
modTimeIgnoreThreshold)
           }
    -      val newFiles = fs.listStatus(directoryPath, 
filter).map(_.getPath.toString)
    -      val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime
    +      val directoryDepth = fs.getFileStatus(directoryPath).getPath.depth()
    +
    +      // Nested directories to find new files.
    +      def dfs(status: FileStatus): List[FileStatus] = {
    +        val path = status.getPath
    +        val depthFilter = depth + directoryDepth - path.depth()
    +        if (status.isDir) {
    +          if (depthFilter - 1 >= 0) {
    +            if (lastFoundDirs.contains(path)) {
    +              if (status.getModificationTime > modTimeIgnoreThreshold) {
    +                fs.listStatus(path).toList.flatMap(dfs(_))
    +              } else Nil
    +            } else {
    +              lastFoundDirs += path
    +              fs.listStatus(path).toList.flatMap(dfs(_))
    +            }
    +          } else Nil
    +        } else {
    +          if (filter.accept(path)) status :: Nil else Nil
    +        }
    +      }
    +
    +      val path = if (lastFoundDirs.isEmpty) 
Seq(fs.getFileStatus(directoryPath))
    +      else {
    +        lastFoundDirs.filter { path =>
    +          // If the mod time of directory is more than ignore time, no new 
files in this directory.
    +          try {
    +            val status = fs.getFileStatus(path)
    +            if (status != null && status.getModificationTime > 
modTimeIgnoreThreshold) true
    --- End diff --
    
    You can get rid of the entire `if` since you're saying `if (x) true else 
false`. Just write `status != null && status.getModificationTime > 
modTimeIgnoreThreshold`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to