Repository: spark
Updated Branches:
  refs/heads/branch-1.0 1c12b0b5c -> 885489112


[SPARK-2362] Fix for newFilesOnly logic in file DStream

The newFilesOnly logic should be inverted: the logic should be that if the flag 
newFilesOnly==true then only start reading files older than current time. As 
the code is now if newFilesOnly==true then it will start to read files that are 
older than 0L (that is: every file in the directory).

Author: Gabriele Nizzoli <m...@nizzoli.net>

Closes #1077 from gabrielenizzoli/master and squashes the following commits:

4f1d261 [Gabriele Nizzoli] Fix for newFilesOnly logic in file DStream

(cherry picked from commit e6f7bfcfbf6aff7a9f8cd8e0a2166d0bf62b0912)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88548911
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88548911
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88548911

Branch: refs/heads/branch-1.0
Commit: 885489112c82eb909df7efbf0515fd7abfae41a4
Parents: 1c12b0b
Author: Gabriele Nizzoli <m...@nizzoli.net>
Authored: Tue Jul 8 14:23:38 2014 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Jul 8 14:24:50 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/streaming/dstream/FileInputDStream.scala      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/88548911/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index e878285..9eecbfa 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -45,7 +45,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: 
NewInputFormat[K,V] : Clas
   // Files with mod time earlier than this is ignored. This is updated every 
interval
   // such that in the current interval, files older than any file found in the
   // previous interval will be ignored. Obviously this time keeps moving 
forward.
-  private var ignoreTime = if (newFilesOnly) 0L else System.currentTimeMillis()
+  private var ignoreTime = if (newFilesOnly) System.currentTimeMillis() else 0L
 
   // Latest file mod time seen till any point of time
   @transient private var path_ : Path = null

Reply via email to