Github user steveloughran commented on a diff in the pull request:
https://github.com/apache/spark/pull/14731#discussion_r103184528
--- Diff: docs/streaming-programming-guide.md ---
@@ -615,35 +615,114 @@ which creates a DStream from text
data received over a TCP socket connection. Besides sockets, the
StreamingContext API provides
methods for creating DStreams from files as input sources.
-- **File Streams:** For reading data from files on any file system
compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be
created as:
+#### File Streams
+{:.no_toc}
+
+For reading data from files on any file system compatible with the HDFS
API (that is, HDFS, S3, NFS, etc.), a DStream can be created as
+via `StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]`.
+
+File streams do not require running a receiver, hence does not require
allocating cores.
+
+For simple text files, the easiest method is
`StreamingContext.textFileStream(dataDirectory)`.
+
+<div class="codetabs">
+<div data-lang="scala" markdown="1">
+
+{% highlight scala %}
+streamingContext.fileStream[KeyClass, ValueClass,
InputFormatClass](dataDirectory)
+{% endhighlight %}
+For text files
+
+{% highlight scala %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+</div>
+
+<div data-lang="java" markdown="1">
+{% highlight java %}
+streamingContext.fileStream<KeyClass, ValueClass,
InputFormatClass>(dataDirectory);
+{% endhighlight %}
+For text files
+
+{% highlight java %}
+streamingContext.textFileStream(dataDirectory);
+{% endhighlight %}
+</div>
- <div class="codetabs">
- <div data-lang="scala" markdown="1">
- streamingContext.fileStream[KeyClass, ValueClass,
InputFormatClass](dataDirectory)
- </div>
- <div data-lang="java" markdown="1">
- streamingContext.fileStream<KeyClass, ValueClass,
InputFormatClass>(dataDirectory);
- </div>
- <div data-lang="python" markdown="1">
- streamingContext.textFileStream(dataDirectory)
- </div>
- </div>
+<div data-lang="python" markdown="1">
+`fileStream` is not available in the Python API; only `textFileStream` is
available.
+{% highlight python %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+</div>
- Spark Streaming will monitor the directory `dataDirectory` and process
any files created in that directory (files written in nested directories not
supported). Note that
+</div>
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are
being continuously appended, the new data will not be read.
+##### How Directories are Monitored
+{:.no_toc}
- For simple text files, there is an easier method
`streamingContext.textFileStream(dataDirectory)`. And file streams do not
require running a receiver, hence does not require allocating cores.
+Spark Streaming will monitor the directory `dataDirectory` and process any
files created in that directory.
+
+ * A simple directory can be monitored, such as
`"hdfs://namenode:8040/logs/"`.
+ All files directly under such a path will be processed as they are
discovered.
+ + A [POSIX glob
pattern](http://pubs.opengroup.org/onlinepubs/009695399/utilities/xcu_chap02.html#tag_02_13_02)
can be supplied, such as
+ `"hdfs://namenode:8040/logs/2017/*"`.
+ Here, the DStream will consist of all files in the directories
+ matching the pattern.
+ That is: it is a pattern of directories, not of files in directories.
+ + All files must be in the same data format.
+ * A file is considered part of a time period based on its modification
time,
+ not its creation time.
+ + Once processed, changes to a file within the current window will not
cause the file to be reread.
+ That is: *updates are ignored*.
+ + The more files under a directory, the longer it will take to
+ scan for changes â even if no files have been modified.
+ * If a wildcard is used to identify directories, such as
`"hdfs://namenode:8040/logs/2016-*"`,
+ renaming an entire directory to match the path will add the directory
to the list of
+ monitored directories. Only the files in the directory whose
modification time is
+ within the current window will be included in the stream.
+ + Calling
[`FileSystem.setTimes()`](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#setTimes-org.apache.hadoop.fs.Path-long-long-)
+ to fix the timestamp is a way to have the file picked up in a later
window, even if its contents have not changed.
+
+
--- End diff --
for "real" filesystems, rename doesn't change modtime, and files become
visible in create(), so if you do a create() in the dest dir the file may be
found and scanned before the data is fully written. Hence best practise: write
elsewhere and rename in. Which by its very nature is the wrong thing to do for
object stores: not only is there the perf hit of the copy, the timestamp
changes. -and you can't set the times; if you could, we'd probably do it in the
filesystem code. PUT-in-place eliminates the copy, and as the file's aren't
visible until close(), no risk of early scan
Of course, real (tm) filesystems do let you update the modtime, you can
update them after the rename. Even better: do it before so there is no time
period when they are in the dest folder with a different timestamp
If you want to make sure this section is clear about what the best
practises and surprises are for both FS and blobstore submission, suggest any
changes
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]