Github user pwendell commented on a diff in the pull request:
https://github.com/apache/spark/pull/3419#discussion_r20773446
--- Diff:
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
---
@@ -17,18 +17,55 @@
package org.apache.spark.streaming.dstream
-import java.io.{ObjectInputStream, IOException}
-import scala.collection.mutable.{HashSet, HashMap}
+import java.io.{IOException, ObjectInputStream}
+
+import scala.collection.mutable
import scala.reflect.ClassTag
+
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
-import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.rdd.UnionRDD
-import org.apache.spark.streaming.{StreamingContext, Time}
-import org.apache.spark.util.{TimeStampedHashMap, Utils}
+import org.apache.spark.rdd.{RDD, UnionRDD}
+import org.apache.spark.streaming._
+import org.apache.spark.util.{TimeStampedHashMap, Utils}
+/**
+ * This class represents an input stream that monitors a Hadoop-compatible
filesystem for new
+ * files and creates a stream out of them. The way it works as follows.
+ *
+ * At each batch interval, the file system is queries for files in the
given directory and
+ * detected new files are selected for that batch. In this case "new"
means files that
+ * became visible to readers during that time period. Some extra care is
needed to deal
+ * with the fact that files may become visible after they are created. For
this purpose, this
+ * class remembers the information about the files selected in past
batches for
+ * a certain duration (say, "remember window") as shown in the figure
below.
+ *
+ * |<----- remember window ----->|
+ * ignore threshold --->| |<--- current batch
time
+ * |____.____.____.____.____.____|
+ * | | | | | | |
+ *
---------------------|----|----|----|----|----|----|----------------------->
Time
+ * |____|____|____|____|____|____|
+ * remembered batches
+ *
+ * The trailing end of the window is the "ignore threshold" and all files
whose mod times
+ * are less than this threshold are assumed to have already been selected
and are therefore
+ * ignored. Files whose mode times are within the "remember window" are
checked against files
+ * that have already been selected. At a high level, this is how new files
are identified in
+ * each batch - files whose mod times are greater than the ignore
threshold and
+ * have not been considered within the remember window. See the
documentation on the method
+ * `isNewFile` for more details.
+ *
+ * This makes some assumptions from the underlying file system that the
system is monitoring.
+ * - The clock of the file system is assumed to synchronized with the
clock of the machine running
+ * the streaming app.
+ * - If a file is to be visible in the directory listings, it must be
visible within a certain
+ * duration of the mod time of the file. This duration is the "remember
window", which is set to
+ * 1 minute (see `FileInputDStream.REMEMBER_DURATION`). Otherwise, the
file will not be
+ * selected as the mod time will be less than the ignore threshold when
it become visible.
--- End diff --
to make it a bit more clear maybe say "the fill will never be selected"
rather than "will not be selected"
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]