[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-04-24 Thread steveloughran
Github user steveloughran closed the pull request at:

https://github.com/apache/spark/pull/14731


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-03-21 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r107194624
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---
@@ -27,7 +27,8 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import com.google.common.io.Files
-import org.apache.hadoop.fs.Path
+import org.apache.commons.io.IOUtils
--- End diff --

Actually, there's a straightforward reason: the test is using the hadoop FS 
APIs, opening an input stream from a Path and writing to it; `Files.write` is 
working with a local file. It doesn't work with hadoop FileSystem and Path 
classes, so could only be used by abusing knowledge of path URLs. Going through 
FileSystem/Path uses the same API as you'd use in production, so is the more 
rigorous test.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-03-21 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r107161471
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---
@@ -557,4 +557,16 @@ trait TestSuiteBase extends SparkFunSuite with 
BeforeAndAfter with Logging {
   verifyOutput[W](output.toSeq, expectedOutput, useSet)
 }
   }
+
+  /**
+   * Creates a temporary directory, which is then passed to `f` and will 
be deleted after `f`
+   * returns.
+   * (originally from `SqlTestUtils`.)
+   * @todo Probably this method should be moved to a more general place
+   */
+  protected def withTempDir(f: File => Unit): Unit = {
--- End diff --

I see, it's only otherwise defined in the SQL test utils class. Well 
something we could unify one day, maybe not such a big deal now here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-03-21 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r107152771
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---
@@ -27,7 +27,8 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import com.google.common.io.Files
-import org.apache.hadoop.fs.Path
+import org.apache.commons.io.IOUtils
--- End diff --

I'll look @ that; I think I went with IOU as it was on the CP and I'd never 
had bad experiences of it. Guava, well...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-03-21 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r107152263
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---
@@ -557,4 +557,16 @@ trait TestSuiteBase extends SparkFunSuite with 
BeforeAndAfter with Logging {
   verifyOutput[W](output.toSeq, expectedOutput, useSet)
 }
   }
+
+  /**
+   * Creates a temporary directory, which is then passed to `f` and will 
be deleted after `f`
+   * returns.
+   * (originally from `SqlTestUtils`.)
+   * @todo Probably this method should be moved to a more general place
+   */
+  protected def withTempDir(f: File => Unit): Unit = {
--- End diff --

yes, but in a module that isn't the one where these tests were, so it'd 
need more dependency logic or pulling it up into a common module, which, if 
done properly, makes for a big diff


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-03-21 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r107140800
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---
@@ -27,7 +27,8 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import com.google.common.io.Files
-import org.apache.hadoop.fs.Path
+import org.apache.commons.io.IOUtils
--- End diff --

Just use Files.write?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-03-21 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r107140320
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala ---
@@ -557,4 +557,16 @@ trait TestSuiteBase extends SparkFunSuite with 
BeforeAndAfter with Logging {
   verifyOutput[W](output.toSeq, expectedOutput, useSet)
 }
   }
+
+  /**
+   * Creates a temporary directory, which is then passed to `f` and will 
be deleted after `f`
+   * returns.
+   * (originally from `SqlTestUtils`.)
+   * @todo Probably this method should be moved to a more general place
+   */
+  protected def withTempDir(f: File => Unit): Unit = {
--- End diff --

We don't already have this defined and available elsewhere?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-02-27 Thread uncleGen
Github user uncleGen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r103187577
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -140,7 +137,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* a union RDD out of them. Note that this maintains the list of files 
that were processed
* in the latest modification time in the previous call to this method. 
This is because the
* modification time returned by the FileStatus API seems to return 
times only at the
-   * granularity of seconds. And new files may have the same modification 
time as the
+   * granularity of seconds in HDFS. And new files may have the same 
modification time as the
* latest modification time in the previous call to this method yet was 
not reported in
--- End diff --

got it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-02-27 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r103184528
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -615,35 +615,114 @@ which creates a DStream from text
 data received over a TCP socket connection. Besides sockets, the 
StreamingContext API provides
 methods for creating DStreams from files as input sources.
 
-- **File Streams:** For reading data from files on any file system 
compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be 
created as:
+ File Streams
+{:.no_toc}
+
+For reading data from files on any file system compatible with the HDFS 
API (that is, HDFS, S3, NFS, etc.), a DStream can be created as
+via `StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]`.
+
+File streams do not require running a receiver, hence does not require 
allocating cores.
+
+For simple text files, the easiest method is 
`StreamingContext.textFileStream(dataDirectory)`. 
+
+
+
+
+{% highlight scala %}
+streamingContext.fileStream[KeyClass, ValueClass, 
InputFormatClass](dataDirectory)
+{% endhighlight %}
+For text files
+
+{% highlight scala %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+
+
+
+{% highlight java %}
+streamingContext.fileStream(dataDirectory);
+{% endhighlight %}
+For text files
+
+{% highlight java %}
+streamingContext.textFileStream(dataDirectory);
+{% endhighlight %}
+
 
-
-
-streamingContext.fileStream[KeyClass, ValueClass, 
InputFormatClass](dataDirectory)
-
-
-   streamingContext.fileStream(dataDirectory);
-
-
-   streamingContext.textFileStream(dataDirectory)
-
-
+
+`fileStream` is not available in the Python API; only `textFileStream` is 
available.
+{% highlight python %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
+
 
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
+# How Directories are Monitored
+{:.no_toc}
 
-   For simple text files, there is an easier method 
`streamingContext.textFileStream(dataDirectory)`. And file streams do not 
require running a receiver, hence does not require allocating cores.
+Spark Streaming will monitor the directory `dataDirectory` and process any 
files created in that directory.
+
+   * A simple directory can be monitored, such as 
`"hdfs://namenode:8040/logs/"`.
+ All files directly under such a path will be processed as they are 
discovered.
+   + A [POSIX glob 
pattern](http://pubs.opengroup.org/onlinepubs/009695399/utilities/xcu_chap02.html#tag_02_13_02)
 can be supplied, such as
+ `"hdfs://namenode:8040/logs/2017/*"`.
+ Here, the DStream will consist of all files in the directories
+ matching the pattern.
+ That is: it is a pattern of directories, not of files in directories.
+   + All files must be in the same data format.
+   * A file is considered part of a time period based on its modification 
time,
+ not its creation time.
+   + Once processed, changes to a file within the current window will not 
cause the file to be reread.
+ That is: *updates are ignored*.
+   + The more files under a directory, the longer it will take to
+ scan for changes — even if no files have been modified.
+   * If a wildcard is used to identify directories, such as 
`"hdfs://namenode:8040/logs/2016-*"`,
+ renaming an entire directory to match the path will add the directory 
to the list of
+ monitored directories. Only the files in the directory whose 
modification time is
+ within the current window will be included in the stream.
+   + Calling 
[`FileSystem.setTimes()`](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#setTimes-org.apache.hadoop.fs.Path-long-long-)
+ to fix the timestamp is a way to have the file picked up in a later 
window, even if its contents have not changed.
+
+
--- End diff --

for "real" filesystems, rename doesn't change modtime, and files become 
visible in create(), so if you do a create() in the dest dir the file may be 
found and scanned before the data is fully written. Hence best practise: write 
elsewhere an

[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-02-27 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r103183646
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -140,7 +137,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
* a union RDD out of them. Note that this maintains the list of files 
that were processed
* in the latest modification time in the previous call to this method. 
This is because the
* modification time returned by the FileStatus API seems to return 
times only at the
-   * granularity of seconds. And new files may have the same modification 
time as the
+   * granularity of seconds in HDFS. And new files may have the same 
modification time as the
* latest modification time in the previous call to this method yet was 
not reported in
--- End diff --

no, it really is HDFS alone. We have no idea whatsoever about the 
granuarity of other filesystems. Could be 2 seconds (is FAT 32 supported? Hope 
not) NTFS is in nanoseconds 
[apparently](https://msdn.microsoft.com/en-us/library/windows/desktop/ms724290(v=vs.85).aspx).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-02-24 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r102977480
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -630,35 +630,106 @@ which creates a DStream from text
 data received over a TCP socket connection. Besides sockets, the 
StreamingContext API provides
 methods for creating DStreams from files as input sources.
 
-- **File Streams:** For reading data from files on any file system 
compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be 
created as:
+ File Streams
+{:.no_toc}
+
+For reading data from files on any file system compatible with the HDFS 
API (that is, HDFS, S3, NFS, etc.), a DStream can be created as
+via `StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]`.
+
+File streams do not require running a receiver, hence does not require 
allocating cores.
 
-
-
-streamingContext.fileStream[KeyClass, ValueClass, 
InputFormatClass](dataDirectory)
-
-
-   streamingContext.fileStream(dataDirectory);
-
-
-   streamingContext.textFileStream(dataDirectory)
-
-
+For simple text files, the easiest method is 
`StreamingContext.textFileStream(dataDirectory)`. 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
+
+
 
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
+{% highlight scala %}
+streamingContext.fileStream[KeyClass, ValueClass, 
InputFormatClass](dataDirectory)
+{% endhighlight %}
+For text files
+
+{% highlight scala %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+
 
-   For simple text files, there is an easier method 
`streamingContext.textFileStream(dataDirectory)`. And file streams do not 
require running a receiver, hence does not require allocating cores.
+
+{% highlight java %}
+streamingContext.fileStream(dataDirectory);
+{% endhighlight %}
+For text files
 
-   Python API 
`fileStream` is not available in the Python API, only  `textFileStream` is  
   available.
+{% highlight java %}
+streamingContext.textFileStream(dataDirectory);
+{% endhighlight %}
+
 
-- **Streams based on Custom Receivers:** DStreams can be created with data 
streams received through custom receivers. See the [Custom Receiver
+
+`fileStream` is not available in the Python API; only `textFileStream` is 
available.
+{% highlight python %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+
+
+
+
+# How Directories are Monitored
+{:.no_toc}
+
+Spark Streaming will monitor the directory `dataDirectory` and process any 
files created in that directory.
+
+   * A simple directory can be monitored, such as 
`hdfs://namenode:8040/logs/`.
+ All files directly under such a path will be processed as they are 
discovered.
+   + A [POSIX glob 
pattern](http://pubs.opengroup.org/onlinepubs/009695399/utilities/xcu_chap02.html#tag_02_13_02)
 can be supplied, such as
+ `hdfs://namenode:8040/logs/2017/*`.
+ Here, the DStream will consist of all files in the directories
+ matching the pattern.
+ That is: it is a pattern of directories, not of files in directories.
+   + All files must be in the same data format.
+   * A file is considered part of a time period based on its modification 
time,
+ not its creation time.
+   + Once processed, changes to a file within the current window will not 
cause the file to be reread.
+ That is: *updates are ignored*.
+   + The more files under a directory, the longer it will take to
+ scan for changes —even if no files have been modified.
--- End diff --

fixed


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-01-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r97367042
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, 
V]](
   logDebug(s"Getting new files for time $currentTime, " +
 s"ignoring files older than $modTimeIgnoreThreshold")
 
-  val newFileFilter = new PathFilter {
-def accept(path: Path): Boolean = isNewFile(path, currentTime, 
modTimeIgnoreThreshold)
-  }
-  val directoryFilter = new PathFilter {
-override def accept(path: Path): Boolean = 
fs.getFileStatus(path).isDirectory
-  }
-  val directories = fs.globStatus(directoryPath, 
directoryFilter).map(_.getPath)
+  val directories = 
Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
--- End diff --

I believe the getOrElse clause should handle the globStatus() return value 
being null, upconverting it to an empty array. Which is essentially working 
backwards from this special (awful) case in Glob code
```java
if ((!sawWildcard) && results.isEmpty() &&
(flattenedPatterns.size() <= 1)) {
  return null;
}
```
With the directories now being an empty array, `flatMap()` should 
effectively start with a `map` of the 0-element array, which is will result in 
the`newFiles` value also being empty. Isn't that right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-01-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r97363419
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---
@@ -235,18 +236,97 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
 }
 // Over time, create files in the temp directory 1
 val input1 = Seq(1, 2, 3, 4, 5)
-input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1))
+input1.foreach(i => createFileAndAdvanceTime(i, testSubDir1))
 
 // Over time, create files in the temp directory 1
 val input2 = Seq(6, 7, 8, 9, 10)
-input2.foreach(i => createFileAndAdvenceTime(i, testSubDir2))
+input2.foreach(i => createFileAndAdvanceTime(i, testSubDir2))
 
 // Verify that all the files have been read
 val expectedOutput = (input1 ++ input2).map(_.toString).toSet
 assert(outputQueue.asScala.flatten.toSet === expectedOutput)
   }
 } finally {
-  if (testDir != null) Utils.deleteRecursively(testDir)
+  if (testDir != null) {
+Utils.deleteRecursively(testDir)
+  }
+}
+  }
+
+  /**
+   * Tests that renamed directories are included in new batches -but that 
only files created
+   * within the batch window are included.
+   * Uses the Hadoop APIs to verify consistent behavior with the 
operations used internally.
+   */
+  test("renamed directories are scanned") {
+val testDir = Utils.createTempDir()
--- End diff --

That's in `SQLTestUtils`, and not in the streaming suite can pickup without 
adding spark-sql:test-jar to the test cp. Or: copy & paste in streaming 
(`org.apache.spark.streaming.TestSuiteBase`), copy & paste somwhere into 
spark-core:test-jar, which would be a bigger change.

How about I add it to the spark streaming testbase and then move the other 
tests in InputStreamSuite to it, either in this patch or a successor? That way: 
cleaner tests




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-01-08 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r95079472
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -630,35 +630,106 @@ which creates a DStream from text
 data received over a TCP socket connection. Besides sockets, the 
StreamingContext API provides
 methods for creating DStreams from files as input sources.
 
-- **File Streams:** For reading data from files on any file system 
compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be 
created as:
+ File Streams
+{:.no_toc}
+
+For reading data from files on any file system compatible with the HDFS 
API (that is, HDFS, S3, NFS, etc.), a DStream can be created as
+via `StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]`.
+
+File streams do not require running a receiver, hence does not require 
allocating cores.
 
-
-
-streamingContext.fileStream[KeyClass, ValueClass, 
InputFormatClass](dataDirectory)
-
-
-   streamingContext.fileStream(dataDirectory);
-
-
-   streamingContext.textFileStream(dataDirectory)
-
-
+For simple text files, the easiest method is 
`StreamingContext.textFileStream(dataDirectory)`. 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
+
+
 
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
+{% highlight scala %}
+streamingContext.fileStream[KeyClass, ValueClass, 
InputFormatClass](dataDirectory)
+{% endhighlight %}
+For text files
+
+{% highlight scala %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+
 
-   For simple text files, there is an easier method 
`streamingContext.textFileStream(dataDirectory)`. And file streams do not 
require running a receiver, hence does not require allocating cores.
+
+{% highlight java %}
+streamingContext.fileStream(dataDirectory);
+{% endhighlight %}
+For text files
 
-   Python API 
`fileStream` is not available in the Python API, only  `textFileStream` is  
   available.
+{% highlight java %}
+streamingContext.textFileStream(dataDirectory);
+{% endhighlight %}
+
 
-- **Streams based on Custom Receivers:** DStreams can be created with data 
streams received through custom receivers. See the [Custom Receiver
+
+`fileStream` is not available in the Python API; only `textFileStream` is 
available.
+{% highlight python %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+
+
+
+
+# How Directories are Monitored
+{:.no_toc}
+
+Spark Streaming will monitor the directory `dataDirectory` and process any 
files created in that directory.
+
+   * A simple directory can be monitored, such as 
`hdfs://namenode:8040/logs/`.
+ All files directly under such a path will be processed as they are 
discovered.
+   + A [POSIX glob 
pattern](http://pubs.opengroup.org/onlinepubs/009695399/utilities/xcu_chap02.html#tag_02_13_02)
 can be supplied, such as
+ `hdfs://namenode:8040/logs/2017/*`.
+ Here, the DStream will consist of all files in the directories
+ matching the pattern.
+ That is: it is a pattern of directories, not of files in directories.
+   + All files must be in the same data format.
+   * A file is considered part of a time period based on its modification 
time,
+ not its creation time.
+   + Once processed, changes to a file within the current window will not 
cause the file to be reread.
+ That is: *updates are ignored*.
+   + The more files under a directory, the longer it will take to
+ scan for changes —even if no files have been modified.
+   * If a wildcard is used to identify directories, such as 
`hdfs://namenode:8040/logs/2016-*`,
+ renaming an entire directory to match the path will add the directory 
to the list of
+ monitored directories. Only the files in the directory whose 
modification time is
+ within the current window will be included in the stream.
+   + Calling `FileSystem.setTimes()` to fix the timestamp is a way to have 
the file picked
+ up in a later window, even if its contents have not changed.
+
+
+# Streaming to FileSystems vs Object stores
+{:.no_toc}
+
+"Full" Filesystems such as HDFS tend to set the modification time 

[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-01-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r95058504
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -630,35 +630,106 @@ which creates a DStream from text
 data received over a TCP socket connection. Besides sockets, the 
StreamingContext API provides
 methods for creating DStreams from files as input sources.
 
-- **File Streams:** For reading data from files on any file system 
compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be 
created as:
+ File Streams
+{:.no_toc}
+
+For reading data from files on any file system compatible with the HDFS 
API (that is, HDFS, S3, NFS, etc.), a DStream can be created as
+via `StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]`.
+
+File streams do not require running a receiver, hence does not require 
allocating cores.
 
-
-
-streamingContext.fileStream[KeyClass, ValueClass, 
InputFormatClass](dataDirectory)
-
-
-   streamingContext.fileStream(dataDirectory);
-
-
-   streamingContext.textFileStream(dataDirectory)
-
-
+For simple text files, the easiest method is 
`StreamingContext.textFileStream(dataDirectory)`. 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
+
+
 
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
+{% highlight scala %}
+streamingContext.fileStream[KeyClass, ValueClass, 
InputFormatClass](dataDirectory)
+{% endhighlight %}
+For text files
+
+{% highlight scala %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+
 
-   For simple text files, there is an easier method 
`streamingContext.textFileStream(dataDirectory)`. And file streams do not 
require running a receiver, hence does not require allocating cores.
+
+{% highlight java %}
+streamingContext.fileStream(dataDirectory);
+{% endhighlight %}
+For text files
 
-   Python API 
`fileStream` is not available in the Python API, only  `textFileStream` is  
   available.
+{% highlight java %}
+streamingContext.textFileStream(dataDirectory);
+{% endhighlight %}
+
 
-- **Streams based on Custom Receivers:** DStreams can be created with data 
streams received through custom receivers. See the [Custom Receiver
+
+`fileStream` is not available in the Python API; only `textFileStream` is 
available.
+{% highlight python %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+
+
+
+
+# How Directories are Monitored
+{:.no_toc}
+
+Spark Streaming will monitor the directory `dataDirectory` and process any 
files created in that directory.
+
+   * A simple directory can be monitored, such as 
`hdfs://namenode:8040/logs/`.
+ All files directly under such a path will be processed as they are 
discovered.
+   + A [POSIX glob 
pattern](http://pubs.opengroup.org/onlinepubs/009695399/utilities/xcu_chap02.html#tag_02_13_02)
 can be supplied, such as
+ `hdfs://namenode:8040/logs/2017/*`.
+ Here, the DStream will consist of all files in the directories
+ matching the pattern.
+ That is: it is a pattern of directories, not of files in directories.
+   + All files must be in the same data format.
+   * A file is considered part of a time period based on its modification 
time,
+ not its creation time.
+   + Once processed, changes to a file within the current window will not 
cause the file to be reread.
+ That is: *updates are ignored*.
+   + The more files under a directory, the longer it will take to
+ scan for changes —even if no files have been modified.
--- End diff --

Nits now: space after hyphen if you please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-01-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r95058826
  
--- Diff: 
streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala ---
@@ -235,18 +236,97 @@ class InputStreamsSuite extends TestSuiteBase with 
BeforeAndAfter {
 }
 // Over time, create files in the temp directory 1
 val input1 = Seq(1, 2, 3, 4, 5)
-input1.foreach(i => createFileAndAdvenceTime(i, testSubDir1))
+input1.foreach(i => createFileAndAdvanceTime(i, testSubDir1))
 
 // Over time, create files in the temp directory 1
 val input2 = Seq(6, 7, 8, 9, 10)
-input2.foreach(i => createFileAndAdvenceTime(i, testSubDir2))
+input2.foreach(i => createFileAndAdvanceTime(i, testSubDir2))
 
 // Verify that all the files have been read
 val expectedOutput = (input1 ++ input2).map(_.toString).toSet
 assert(outputQueue.asScala.flatten.toSet === expectedOutput)
   }
 } finally {
-  if (testDir != null) Utils.deleteRecursively(testDir)
+  if (testDir != null) {
+Utils.deleteRecursively(testDir)
+  }
+}
+  }
+
+  /**
+   * Tests that renamed directories are included in new batches -but that 
only files created
+   * within the batch window are included.
+   * Uses the Hadoop APIs to verify consistent behavior with the 
operations used internally.
+   */
+  test("renamed directories are scanned") {
+val testDir = Utils.createTempDir()
--- End diff --

Can you use `withTempDir { testDir =>`? if so I think several tests could 
use the same pattern here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-01-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r95058534
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -630,35 +630,106 @@ which creates a DStream from text
 data received over a TCP socket connection. Besides sockets, the 
StreamingContext API provides
 methods for creating DStreams from files as input sources.
 
-- **File Streams:** For reading data from files on any file system 
compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be 
created as:
+ File Streams
+{:.no_toc}
+
+For reading data from files on any file system compatible with the HDFS 
API (that is, HDFS, S3, NFS, etc.), a DStream can be created as
+via `StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]`.
+
+File streams do not require running a receiver, hence does not require 
allocating cores.
 
-
-
-streamingContext.fileStream[KeyClass, ValueClass, 
InputFormatClass](dataDirectory)
-
-
-   streamingContext.fileStream(dataDirectory);
-
-
-   streamingContext.textFileStream(dataDirectory)
-
-
+For simple text files, the easiest method is 
`StreamingContext.textFileStream(dataDirectory)`. 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
+
+
 
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
+{% highlight scala %}
+streamingContext.fileStream[KeyClass, ValueClass, 
InputFormatClass](dataDirectory)
+{% endhighlight %}
+For text files
+
+{% highlight scala %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+
 
-   For simple text files, there is an easier method 
`streamingContext.textFileStream(dataDirectory)`. And file streams do not 
require running a receiver, hence does not require allocating cores.
+
+{% highlight java %}
+streamingContext.fileStream(dataDirectory);
+{% endhighlight %}
+For text files
 
-   Python API 
`fileStream` is not available in the Python API, only  `textFileStream` is  
   available.
+{% highlight java %}
+streamingContext.textFileStream(dataDirectory);
+{% endhighlight %}
+
 
-- **Streams based on Custom Receivers:** DStreams can be created with data 
streams received through custom receivers. See the [Custom Receiver
+
+`fileStream` is not available in the Python API; only `textFileStream` is 
available.
+{% highlight python %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+
+
+
+
+# How Directories are Monitored
+{:.no_toc}
+
+Spark Streaming will monitor the directory `dataDirectory` and process any 
files created in that directory.
+
+   * A simple directory can be monitored, such as 
`hdfs://namenode:8040/logs/`.
+ All files directly under such a path will be processed as they are 
discovered.
+   + A [POSIX glob 
pattern](http://pubs.opengroup.org/onlinepubs/009695399/utilities/xcu_chap02.html#tag_02_13_02)
 can be supplied, such as
+ `hdfs://namenode:8040/logs/2017/*`.
+ Here, the DStream will consist of all files in the directories
+ matching the pattern.
+ That is: it is a pattern of directories, not of files in directories.
+   + All files must be in the same data format.
+   * A file is considered part of a time period based on its modification 
time,
+ not its creation time.
+   + Once processed, changes to a file within the current window will not 
cause the file to be reread.
+ That is: *updates are ignored*.
+   + The more files under a directory, the longer it will take to
+ scan for changes —even if no files have been modified.
+   * If a wildcard is used to identify directories, such as 
`hdfs://namenode:8040/logs/2016-*`,
+ renaming an entire directory to match the path will add the directory 
to the list of
+ monitored directories. Only the files in the directory whose 
modification time is
+ within the current window will be included in the stream.
+   + Calling `FileSystem.setTimes()` to fix the timestamp is a way to have 
the file picked
--- End diff --

It could be useful to link to the Hadoop javadoc for this method or at 
least make clear that this is an HDFS API call, not Spark


---
If your project is set up for it, you can reply to this email and have 

[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-01-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r95058852
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, 
V]](
   logDebug(s"Getting new files for time $currentTime, " +
 s"ignoring files older than $modTimeIgnoreThreshold")
 
-  val newFileFilter = new PathFilter {
-def accept(path: Path): Boolean = isNewFile(path, currentTime, 
modTimeIgnoreThreshold)
-  }
-  val directoryFilter = new PathFilter {
-override def accept(path: Path): Boolean = 
fs.getFileStatus(path).isDirectory
-  }
-  val directories = fs.globStatus(directoryPath, 
directoryFilter).map(_.getPath)
+  val directories = 
Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
--- End diff --

Do you need to check for `directories.isEmpty` after this to catch this 
error case earlier?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-01-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r95058662
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -630,35 +630,106 @@ which creates a DStream from text
 data received over a TCP socket connection. Besides sockets, the 
StreamingContext API provides
 methods for creating DStreams from files as input sources.
 
-- **File Streams:** For reading data from files on any file system 
compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be 
created as:
+ File Streams
+{:.no_toc}
+
+For reading data from files on any file system compatible with the HDFS 
API (that is, HDFS, S3, NFS, etc.), a DStream can be created as
+via `StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]`.
+
+File streams do not require running a receiver, hence does not require 
allocating cores.
 
-
-
-streamingContext.fileStream[KeyClass, ValueClass, 
InputFormatClass](dataDirectory)
-
-
-   streamingContext.fileStream(dataDirectory);
-
-
-   streamingContext.textFileStream(dataDirectory)
-
-
+For simple text files, the easiest method is 
`StreamingContext.textFileStream(dataDirectory)`. 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
+
+
 
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
+{% highlight scala %}
+streamingContext.fileStream[KeyClass, ValueClass, 
InputFormatClass](dataDirectory)
+{% endhighlight %}
+For text files
+
+{% highlight scala %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+
 
-   For simple text files, there is an easier method 
`streamingContext.textFileStream(dataDirectory)`. And file streams do not 
require running a receiver, hence does not require allocating cores.
+
+{% highlight java %}
+streamingContext.fileStream(dataDirectory);
+{% endhighlight %}
+For text files
 
-   Python API 
`fileStream` is not available in the Python API, only  `textFileStream` is  
   available.
+{% highlight java %}
+streamingContext.textFileStream(dataDirectory);
+{% endhighlight %}
+
 
-- **Streams based on Custom Receivers:** DStreams can be created with data 
streams received through custom receivers. See the [Custom Receiver
+
+`fileStream` is not available in the Python API; only `textFileStream` is 
available.
+{% highlight python %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+
+
+
+
+# How Directories are Monitored
+{:.no_toc}
+
+Spark Streaming will monitor the directory `dataDirectory` and process any 
files created in that directory.
+
+   * A simple directory can be monitored, such as 
`hdfs://namenode:8040/logs/`.
+ All files directly under such a path will be processed as they are 
discovered.
+   + A [POSIX glob 
pattern](http://pubs.opengroup.org/onlinepubs/009695399/utilities/xcu_chap02.html#tag_02_13_02)
 can be supplied, such as
+ `hdfs://namenode:8040/logs/2017/*`.
+ Here, the DStream will consist of all files in the directories
+ matching the pattern.
+ That is: it is a pattern of directories, not of files in directories.
+   + All files must be in the same data format.
+   * A file is considered part of a time period based on its modification 
time,
+ not its creation time.
+   + Once processed, changes to a file within the current window will not 
cause the file to be reread.
+ That is: *updates are ignored*.
+   + The more files under a directory, the longer it will take to
+ scan for changes —even if no files have been modified.
+   * If a wildcard is used to identify directories, such as 
`hdfs://namenode:8040/logs/2016-*`,
+ renaming an entire directory to match the path will add the directory 
to the list of
+ monitored directories. Only the files in the directory whose 
modification time is
+ within the current window will be included in the stream.
+   + Calling `FileSystem.setTimes()` to fix the timestamp is a way to have 
the file picked
+ up in a later window, even if its contents have not changed.
+
+
+# Streaming to FileSystems vs Object stores
+{:.no_toc}
+
+"Full" Filesystems such as HDFS tend to set the modification time on thei

[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-01-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r95058521
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -630,35 +630,106 @@ which creates a DStream from text
 data received over a TCP socket connection. Besides sockets, the 
StreamingContext API provides
 methods for creating DStreams from files as input sources.
 
-- **File Streams:** For reading data from files on any file system 
compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be 
created as:
+ File Streams
+{:.no_toc}
+
+For reading data from files on any file system compatible with the HDFS 
API (that is, HDFS, S3, NFS, etc.), a DStream can be created as
+via `StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]`.
+
+File streams do not require running a receiver, hence does not require 
allocating cores.
 
-
-
-streamingContext.fileStream[KeyClass, ValueClass, 
InputFormatClass](dataDirectory)
-
-
-   streamingContext.fileStream(dataDirectory);
-
-
-   streamingContext.textFileStream(dataDirectory)
-
-
+For simple text files, the easiest method is 
`StreamingContext.textFileStream(dataDirectory)`. 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
+
+
 
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
+{% highlight scala %}
+streamingContext.fileStream[KeyClass, ValueClass, 
InputFormatClass](dataDirectory)
+{% endhighlight %}
+For text files
--- End diff --

Space before, and colon at the end of aline?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-01-07 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r95058508
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -630,35 +630,106 @@ which creates a DStream from text
 data received over a TCP socket connection. Besides sockets, the 
StreamingContext API provides
 methods for creating DStreams from files as input sources.
 
-- **File Streams:** For reading data from files on any file system 
compatible with the HDFS API (that is, HDFS, S3, NFS, etc.), a DStream can be 
created as:
+ File Streams
+{:.no_toc}
+
+For reading data from files on any file system compatible with the HDFS 
API (that is, HDFS, S3, NFS, etc.), a DStream can be created as
+via `StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]`.
+
+File streams do not require running a receiver, hence does not require 
allocating cores.
 
-
-
-streamingContext.fileStream[KeyClass, ValueClass, 
InputFormatClass](dataDirectory)
-
-
-   streamingContext.fileStream(dataDirectory);
-
-
-   streamingContext.textFileStream(dataDirectory)
-
-
+For simple text files, the easiest method is 
`StreamingContext.textFileStream(dataDirectory)`. 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
+
+
 
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
+{% highlight scala %}
+streamingContext.fileStream[KeyClass, ValueClass, 
InputFormatClass](dataDirectory)
+{% endhighlight %}
+For text files
+
+{% highlight scala %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+
 
-   For simple text files, there is an easier method 
`streamingContext.textFileStream(dataDirectory)`. And file streams do not 
require running a receiver, hence does not require allocating cores.
+
+{% highlight java %}
+streamingContext.fileStream(dataDirectory);
+{% endhighlight %}
+For text files
 
-   Python API 
`fileStream` is not available in the Python API, only  `textFileStream` is  
   available.
+{% highlight java %}
+streamingContext.textFileStream(dataDirectory);
+{% endhighlight %}
+
 
-- **Streams based on Custom Receivers:** DStreams can be created with data 
streams received through custom receivers. See the [Custom Receiver
+
+`fileStream` is not available in the Python API; only `textFileStream` is 
available.
+{% highlight python %}
+streamingContext.textFileStream(dataDirectory)
+{% endhighlight %}
+
+
+
+
+# How Directories are Monitored
+{:.no_toc}
+
+Spark Streaming will monitor the directory `dataDirectory` and process any 
files created in that directory.
+
+   * A simple directory can be monitored, such as 
`hdfs://namenode:8040/logs/`.
+ All files directly under such a path will be processed as they are 
discovered.
+   + A [POSIX glob 
pattern](http://pubs.opengroup.org/onlinepubs/009695399/utilities/xcu_chap02.html#tag_02_13_02)
 can be supplied, such as
+ `hdfs://namenode:8040/logs/2017/*`.
+ Here, the DStream will consist of all files in the directories
+ matching the pattern.
+ That is: it is a pattern of directories, not of files in directories.
+   + All files must be in the same data format.
+   * A file is considered part of a time period based on its modification 
time,
+ not its creation time.
+   + Once processed, changes to a file within the current window will not 
cause the file to be reread.
+ That is: *updates are ignored*.
+   + The more files under a directory, the longer it will take to
+ scan for changes —even if no files have been modified.
+   * If a wildcard is used to identify directories, such as 
`hdfs://namenode:8040/logs/2016-*`,
+ renaming an entire directory to match the path will add the directory 
to the list of
+ monitored directories. Only the files in the directory whose 
modification time is
+ within the current window will be included in the stream.
+   + Calling `FileSystem.setTimes()` to fix the timestamp is a way to have 
the file picked
+ up in a later window, even if its contents have not changed.
+
+
+# Streaming to FileSystems vs Object stores
+{:.no_toc}
+
+"Full" Filesystems such as HDFS tend to set the modification time on thei

[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-01-03 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r94410195
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,17 +644,90 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ ++ The files must have the same data format.
+ + A simple directory can be monitored, such as 
`hdfs://namenode:8040/logs/`.
+   All files directly such a path will be processed as they are 
discovered.
+ + A POSIX glob pattern can be supplied, such as
+   `hdfs://namenode:8040/logs/2016-??-31`.
+   Here, the DStream will consist of all files directly under those 
directories
+   matching the regular expression.
+   That is: it is a pattern of directories, not of files in 
directories.
+ + All files must be in the same data format.
+ * A file is considered part of a time period based on its 
modification time
+   —not its creation time.
+ + Files must be created in/moved under the `dataDirectory` 
directory/directories by
+   an atomic operation. In HDFS and similar filesystems, this can be 
done *renaming* them
+   into the data directory from another part of the same filesystem.
+ * If a wildcard is used to identify directories, such as 
`hdfs://namenode:8040/logs/2016-*`,
+   renaming an entire directory to match the path will add the 
directory to the list of
+   monitored directories. Only the files in the directory whose 
modification time is
+   within the current window will be included in the stream.
+ + Once processed, changes to a file within the current window will 
not cause the file to be reread.
+   That is: *updates are ignored*.
+ + The more files under a directory/wildcard pattern, the longer it 
will take to
+   scan for changes —even if no files have actually changed.
+ + Calling `FileSystem.setTimes()` to fix the timestamp is a way to 
have the file picked
+   up in a later window, even if its contents have not changed.
 
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
 
For simple text files, there is an easier method 
`streamingContext.textFileStream(dataDirectory)`. And file streams do not 
require running a receiver, hence does not require allocating cores.
 
Python API 
`fileStream` is not available in the Python API, only  `textFileStream` is  
   available.
 
+
+Special points for HDFS
+
+The HDFS filesystem does not update the modification time while it is 
being written to.
+Specifically
+
++ `FileSystem.create()` creation: a zero-byte file is listed; creation 
and modification time is
+  set to the current time as seen on the NameNode.
+* Writes to a file via the output stream returned in the `create()` 
call: the modification
+  time *does not change*.
+* When `OutputStream.close()` is called, all remaining data is 
written, the file closed and
+  the NameNode updated with the final size of the file. The 
modification time is set to
+  the time the file was closed.
+* File opened for appends via an `append()` operation. This does not 
change the modification
+  time of the file until the `close()` call is made on the output 
stream.
+* `FileSystem.setTimes()` can be used to explicitly set the time on a 
file.  
+* The rarely used operations:  `FileSystem.concat()`, 
`createSnapshot()`, `createSymlink()` and
+  `truncate()` all update the modification time.  
+ 
+Together, this means that when a file is opened, even before data has 
been completely written,
+it may be included in the DStream -after which updates to the file 
within the same window
+will be ignored. That is: changes may be missed, and data omitted from 
the stream. 
+To guarantee that changes are picked up in a window, write the file
+to an unmonitored directory, then immediately after the output stream 
is closed,
+rename it into the destination directory. 
+Provided the renamed file appears in the scanned destination directory 
during the window
+of its creation, the new data will be picked up.
+
+O

[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-01-03 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r94407382
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,17 +644,90 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ ++ The files must have the same data format.
+ + A simple directory can be monitored, such as 
`hdfs://namenode:8040/logs/`.
+   All files directly such a path will be processed as they are 
discovered.
+ + A POSIX glob pattern can be supplied, such as
+   `hdfs://namenode:8040/logs/2016-??-31`.
+   Here, the DStream will consist of all files directly under those 
directories
+   matching the regular expression.
+   That is: it is a pattern of directories, not of files in 
directories.
+ + All files must be in the same data format.
+ * A file is considered part of a time period based on its 
modification time
+   —not its creation time.
+ + Files must be created in/moved under the `dataDirectory` 
directory/directories by
+   an atomic operation. In HDFS and similar filesystems, this can be 
done *renaming* them
+   into the data directory from another part of the same filesystem.
+ * If a wildcard is used to identify directories, such as 
`hdfs://namenode:8040/logs/2016-*`,
+   renaming an entire directory to match the path will add the 
directory to the list of
+   monitored directories. Only the files in the directory whose 
modification time is
+   within the current window will be included in the stream.
+ + Once processed, changes to a file within the current window will 
not cause the file to be reread.
+   That is: *updates are ignored*.
+ + The more files under a directory/wildcard pattern, the longer it 
will take to
+   scan for changes —even if no files have actually changed.
+ + Calling `FileSystem.setTimes()` to fix the timestamp is a way to 
have the file picked
+   up in a later window, even if its contents have not changed.
 
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
 
For simple text files, there is an easier method 
`streamingContext.textFileStream(dataDirectory)`. And file streams do not 
require running a receiver, hence does not require allocating cores.
 
Python API 
`fileStream` is not available in the Python API, only  `textFileStream` is  
   available.
 
+
+Special points for HDFS
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2017-01-03 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r94407115
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,17 +644,90 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ ++ The files must have the same data format.
+ + A simple directory can be monitored, such as 
`hdfs://namenode:8040/logs/`.
+   All files directly such a path will be processed as they are 
discovered.
+ + A POSIX glob pattern can be supplied, such as
+   `hdfs://namenode:8040/logs/2016-??-31`.
+   Here, the DStream will consist of all files directly under those 
directories
+   matching the regular expression.
--- End diff --

I added a link to the posix docs. If you follow them, you eventually end up 
on some coverage of regexps inside []; the Hadoop Glob code does actually 
convert the shell expression to a java regexp, then compile it in, so 
presumably should handle everything that the regexp engine (originally 
{{java.util.regexp}}, currently {{com.google.re2j}} can compile. That's too 
much detail and something that should really be covered in the Hadoop docs by 
someone.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-10-14 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r83490975
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,17 +644,90 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ ++ The files must have the same data format.
+ + A simple directory can be monitored, such as 
`hdfs://namenode:8040/logs/`.
+   All files directly such a path will be processed as they are 
discovered.
+ + A POSIX glob pattern can be supplied, such as
+   `hdfs://namenode:8040/logs/2016-??-31`.
+   Here, the DStream will consist of all files directly under those 
directories
+   matching the regular expression.
--- End diff --

I think this comment is still incorrect. A glob is not a regex. If this is 
just the syntax other Hadoop APIs support, that seems reasonable, but it should 
be described that way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-10-14 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r83491041
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,17 +644,90 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ ++ The files must have the same data format.
+ + A simple directory can be monitored, such as 
`hdfs://namenode:8040/logs/`.
+   All files directly such a path will be processed as they are 
discovered.
+ + A POSIX glob pattern can be supplied, such as
+   `hdfs://namenode:8040/logs/2016-??-31`.
+   Here, the DStream will consist of all files directly under those 
directories
+   matching the regular expression.
+   That is: it is a pattern of directories, not of files in 
directories.
+ + All files must be in the same data format.
+ * A file is considered part of a time period based on its 
modification time
+   —not its creation time.
+ + Files must be created in/moved under the `dataDirectory` 
directory/directories by
+   an atomic operation. In HDFS and similar filesystems, this can be 
done *renaming* them
+   into the data directory from another part of the same filesystem.
+ * If a wildcard is used to identify directories, such as 
`hdfs://namenode:8040/logs/2016-*`,
+   renaming an entire directory to match the path will add the 
directory to the list of
+   monitored directories. Only the files in the directory whose 
modification time is
+   within the current window will be included in the stream.
+ + Once processed, changes to a file within the current window will 
not cause the file to be reread.
+   That is: *updates are ignored*.
+ + The more files under a directory/wildcard pattern, the longer it 
will take to
+   scan for changes —even if no files have actually changed.
+ + Calling `FileSystem.setTimes()` to fix the timestamp is a way to 
have the file picked
+   up in a later window, even if its contents have not changed.
 
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
 
For simple text files, there is an easier method 
`streamingContext.textFileStream(dataDirectory)`. And file streams do not 
require running a receiver, hence does not require allocating cores.
 
Python API 
`fileStream` is not available in the Python API, only  `textFileStream` is  
   available.
 
+
+Special points for HDFS
--- End diff --

Does this need to start a new section in the markdown?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-10-14 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r83491136
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,17 +644,90 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ ++ The files must have the same data format.
+ + A simple directory can be monitored, such as 
`hdfs://namenode:8040/logs/`.
+   All files directly such a path will be processed as they are 
discovered.
+ + A POSIX glob pattern can be supplied, such as
+   `hdfs://namenode:8040/logs/2016-??-31`.
+   Here, the DStream will consist of all files directly under those 
directories
+   matching the regular expression.
+   That is: it is a pattern of directories, not of files in 
directories.
+ + All files must be in the same data format.
+ * A file is considered part of a time period based on its 
modification time
+   —not its creation time.
+ + Files must be created in/moved under the `dataDirectory` 
directory/directories by
+   an atomic operation. In HDFS and similar filesystems, this can be 
done *renaming* them
+   into the data directory from another part of the same filesystem.
+ * If a wildcard is used to identify directories, such as 
`hdfs://namenode:8040/logs/2016-*`,
+   renaming an entire directory to match the path will add the 
directory to the list of
+   monitored directories. Only the files in the directory whose 
modification time is
+   within the current window will be included in the stream.
+ + Once processed, changes to a file within the current window will 
not cause the file to be reread.
+   That is: *updates are ignored*.
+ + The more files under a directory/wildcard pattern, the longer it 
will take to
+   scan for changes —even if no files have actually changed.
+ + Calling `FileSystem.setTimes()` to fix the timestamp is a way to 
have the file picked
+   up in a later window, even if its contents have not changed.
 
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
 
For simple text files, there is an easier method 
`streamingContext.textFileStream(dataDirectory)`. And file streams do not 
require running a receiver, hence does not require allocating cores.
 
Python API 
`fileStream` is not available in the Python API, only  `textFileStream` is  
   available.
 
+
+Special points for HDFS
+
+The HDFS filesystem does not update the modification time while it is 
being written to.
+Specifically
+
++ `FileSystem.create()` creation: a zero-byte file is listed; creation 
and modification time is
+  set to the current time as seen on the NameNode.
+* Writes to a file via the output stream returned in the `create()` 
call: the modification
+  time *does not change*.
+* When `OutputStream.close()` is called, all remaining data is 
written, the file closed and
+  the NameNode updated with the final size of the file. The 
modification time is set to
+  the time the file was closed.
+* File opened for appends via an `append()` operation. This does not 
change the modification
+  time of the file until the `close()` call is made on the output 
stream.
+* `FileSystem.setTimes()` can be used to explicitly set the time on a 
file.  
+* The rarely used operations:  `FileSystem.concat()`, 
`createSnapshot()`, `createSymlink()` and
+  `truncate()` all update the modification time.  
+ 
+Together, this means that when a file is opened, even before data has 
been completely written,
+it may be included in the DStream -after which updates to the file 
within the same window
+will be ignored. That is: changes may be missed, and data omitted from 
the stream. 
+To guarantee that changes are picked up in a window, write the file
+to an unmonitored directory, then immediately after the output stream 
is closed,
+rename it into the destination directory. 
+Provided the renamed file appears in the scanned destination directory 
during the window
+of its creation, the new data will be picked up.
+
+Object s

[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-09-16 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r79209668
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,13 +644,44 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
-
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
-
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ + A simple directory can be supplied, such as 
`hdfs://namenode:8040/logs/`.
+   All files directly such a path will be processed as they are 
discovered.
+ + A regular expression can be supplied instead, such as
+   `hdfs://namenode:8040/logs/2016-*-31`.
+   Here, the DStream will consist of all files directly under those 
directories
+   matching the regular expression.
+   That is: it is a pattern of directories, not of files in 
directories.
+ + All files must be in the same data format.
+ * A file is considered part of a time period based on its 
modification time
+   —not its creation time.
+ + Files must be created in/moved under the `dataDirectory` 
directory/directories by
+   an atomic operation. In HDFS and similar filesystems, this can be 
done *renaming* them
+   into the data directory from another part of the same filesystem.
+ * If a wildcard is used to identify directories, such as 
`hdfs://namenode:8040/logs/2016*`,
+   renaming an entire directory to match the path will add the 
directory to the list of
+   monitored directories. However, unless the modification time of the 
directory's files
+   are within that of the current window, they will not be recognized 
as new files.
+ + Once processed, changes to a file will not cause the file to be 
reread.
+   That is: Updates are ignored.
+ + The more files under a directory/wildcard pattern, the longer it 
will take to
+   scan for changes —even if no files have actually changed.
+
+Special points for object stores
--- End diff --

For object stores, direct writes to the directory, resulting in a PUT on 
close(), will guarantee that a file is picked up immediately. Things are 
actually a bit quirky for HDFS; even file length doesn't get updated reliably 
during a write-in-progress. I'll add a section there and then ask people who 
understand HDFS what is really happening


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-09-15 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r78932943
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,13 +644,44 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
-
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
-
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ + A simple directory can be supplied, such as 
`hdfs://namenode:8040/logs/`.
+   All files directly such a path will be processed as they are 
discovered.
+ + A regular expression can be supplied instead, such as
--- End diff --

yes, the input is always a glob. You'd need to \ escape any file with ? in 
it. 




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-09-15 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r78932779
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,13 +644,44 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
-
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
-
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ + A simple directory can be supplied, such as 
`hdfs://namenode:8040/logs/`.
+   All files directly such a path will be processed as they are 
discovered.
+ + A regular expression can be supplied instead, such as
+   `hdfs://namenode:8040/logs/2016-*-31`.
+   Here, the DStream will consist of all files directly under those 
directories
+   matching the regular expression.
+   That is: it is a pattern of directories, not of files in 
directories.
+ + All files must be in the same data format.
+ * A file is considered part of a time period based on its 
modification time
+   —not its creation time.
+ + Files must be created in/moved under the `dataDirectory` 
directory/directories by
+   an atomic operation. In HDFS and similar filesystems, this can be 
done *renaming* them
+   into the data directory from another part of the same filesystem.
+ * If a wildcard is used to identify directories, such as 
`hdfs://namenode:8040/logs/2016*`,
+   renaming an entire directory to match the path will add the 
directory to the list of
+   monitored directories. However, unless the modification time of the 
directory's files
+   are within that of the current window, they will not be recognized 
as new files.
+ + Once processed, changes to a file will not cause the file to be 
reread.
+   That is: Updates are ignored.
+ + The more files under a directory/wildcard pattern, the longer it 
will take to
+   scan for changes —even if no files have actually changed.
+
+Special points for object stores
--- End diff --

w.r.t slow renames, I think people just need to be aware that it can be 
slow and windows need to plan for it. I'm soon to work on faster renames on S3 
[HADOOP-13600](https://issues.apache.org/jira/browse/HADOOP-13600), but it will 
still be a non-atomic, O(largest-blob) operation


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-09-02 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r77314137
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,13 +644,44 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
-
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
-
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ + A simple directory can be supplied, such as 
`hdfs://namenode:8040/logs/`.
+   All files directly such a path will be processed as they are 
discovered.
+ + A regular expression can be supplied instead, such as
--- End diff --

Also if I read the code correctly, it always treats the input as a glob. If 
so I think that's how it should be documented. That is, if you give it a path 
to a file containing "?" it's not going to do what you think. The glob isn't 
optional if I read it right.

But yeah this looks good to me code-wise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-09-02 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r77313878
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,13 +644,44 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
-
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
-
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ + A simple directory can be supplied, such as 
`hdfs://namenode:8040/logs/`.
+   All files directly such a path will be processed as they are 
discovered.
+ + A regular expression can be supplied instead, such as
+   `hdfs://namenode:8040/logs/2016-*-31`.
+   Here, the DStream will consist of all files directly under those 
directories
+   matching the regular expression.
+   That is: it is a pattern of directories, not of files in 
directories.
+ + All files must be in the same data format.
+ * A file is considered part of a time period based on its 
modification time
+   —not its creation time.
+ + Files must be created in/moved under the `dataDirectory` 
directory/directories by
+   an atomic operation. In HDFS and similar filesystems, this can be 
done *renaming* them
+   into the data directory from another part of the same filesystem.
+ * If a wildcard is used to identify directories, such as 
`hdfs://namenode:8040/logs/2016*`,
+   renaming an entire directory to match the path will add the 
directory to the list of
+   monitored directories. However, unless the modification time of the 
directory's files
+   are within that of the current window, they will not be recognized 
as new files.
+ + Once processed, changes to a file will not cause the file to be 
reread.
+   That is: Updates are ignored.
+ + The more files under a directory/wildcard pattern, the longer it 
will take to
+   scan for changes —even if no files have actually changed.
+
+Special points for object stores
--- End diff --

These may be so, but what of these are actionable to a user/developer? The 
point about wildcards is relevant, but what can they do about slow file renames?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-09-02 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r77313771
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,13 +644,44 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
-
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
-
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ + A simple directory can be supplied, such as 
`hdfs://namenode:8040/logs/`.
+   All files directly such a path will be processed as they are 
discovered.
+ + A regular expression can be supplied instead, such as
--- End diff --

It's a Hadoop-style glob rather than regular expression right? Just 
checking that "*" in glob would mean "1 or more characters"?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-29 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r76593850
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
---
@@ -244,6 +244,31 @@ class SparkHadoopUtil extends Logging {
   }
 
   /**
+   * List directories/files matching the path and return the `FileStatus` 
results.
+   * If the pattern is not a regexp then a simple `getFileStatus(pattern)`
+   * is called to get the status of that path.
+   * If the path/pattern does not match anything in the filesystem,
+   * an empty sequence is returned.
+   * @param pattern pattern
+   * @return a possibly empty array of FileStatus entries
+   */
+  def globToFileStatus(pattern: Path): Array[FileStatus] = {
--- End diff --

essentially if anything which might be a wildcard is hit, it gets handed 
off to the globber for the full interpretation. Same for ^ and ], which are 
only part of a pattern within the context of an opening [

Its only those strings which can be verified to be regexp free in a simple 
context-free string scan that say "absolutely no patterns here"

regarding the bigger change: most of it is isolation of the sensitive code 
*and the tests to verify behaviour*


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-29 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r76594233
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
---
@@ -244,6 +244,31 @@ class SparkHadoopUtil extends Logging {
   }
 
   /**
+   * List directories/files matching the path and return the `FileStatus` 
results.
+   * If the pattern is not a regexp then a simple `getFileStatus(pattern)`
+   * is called to get the status of that path.
+   * If the path/pattern does not match anything in the filesystem,
+   * an empty sequence is returned.
+   * @param pattern pattern
+   * @return a possibly empty array of FileStatus entries
+   */
+  def globToFileStatus(pattern: Path): Array[FileStatus] = {
--- End diff --

Yea, but then that's wrong if for example my path actually has a ? or ^ or 
] in it. It doesn't seem essential and seems even problematic to add this 
behavior change to an otherwise clear fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-29 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r76573274
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
---
@@ -244,6 +244,31 @@ class SparkHadoopUtil extends Logging {
   }
 
   /**
+   * List directories/files matching the path and return the `FileStatus` 
results.
+   * If the pattern is not a regexp then a simple `getFileStatus(pattern)`
+   * is called to get the status of that path.
+   * If the path/pattern does not match anything in the filesystem,
+   * an empty sequence is returned.
+   * @param pattern pattern
+   * @return a possibly empty array of FileStatus entries
+   */
+  def globToFileStatus(pattern: Path): Array[FileStatus] = {
--- End diff --

Here I'm narrowly concerned with the ambiguity of the behavior of a single 
method, because you can't distinguish between a path with a "?" in it and a 
glob wildcard for example. The rest seems orthogonal?

The change as it stood to resolve the issue in the OP seemed OK. This is 
bigger now and I'm not as sure about the rest of the change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-27 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r76514866
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
---
@@ -244,6 +244,31 @@ class SparkHadoopUtil extends Logging {
   }
 
   /**
+   * List directories/files matching the path and return the `FileStatus` 
results.
+   * If the pattern is not a regexp then a simple `getFileStatus(pattern)`
+   * is called to get the status of that path.
+   * If the path/pattern does not match anything in the filesystem,
+   * an empty sequence is returned.
+   * @param pattern pattern
+   * @return a possibly empty array of FileStatus entries
+   */
+  def globToFileStatus(pattern: Path): Array[FileStatus] = {
--- End diff --

It goes with the `globPathIfNecessary` call —you want to rename it to be 
consistent.


Regarding the FS APIs, There's way to many list operations in the FS APIs, 
each with different flaws.

1. The simple `list(path, filter): Array[FS]` operations don't scale to a 
directory with hundreds of thousands of files, hence the remote iterator 
versions
1. None of them provide any consistency guarantees. Worth knowing. This is 
more common in remote iterators as the iteration window is bigger, but even in 
those that return arrays, in a large enough directory things may change during 
the enum
1. Anything that treewaks is very suboptimal on blobstores, somewhat 
inefficient for deep trees.
1. `listFiles(path, recursive=true)` is the sole one which object stores 
can currently optimise by avoiding the treewalk and just doing a bulk list. 
[HADOOP-13208](https://issues.apache.org/jira/browse/HADOOP-13208) has added 
that for S3A.
1. ..but that method filters out all directories, which means that apps 
which do want directories too are out of luck.
1. globStatus() is even less efficient than the others ... have a look at 
the source to see why.
1. In [HADOOP-13371](https://issues.apache.org/jira/browse/HADOOP-13371) 
I'm exploring an optimised globber, but I don't want to write one which 
collapses at scale (i.e in production).

I've added some comments in HADOOP-13371 about what to do there, I will 
probably do that "no regexp -> simple return" strategy implemented in this 
patch. But it will only benefit s3a in Hadoop 2.8+; patching spark benefits 
everything.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-27 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r76512357
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
---
@@ -244,6 +244,31 @@ class SparkHadoopUtil extends Logging {
   }
 
   /**
+   * List directories/files matching the path and return the `FileStatus` 
results.
+   * If the pattern is not a regexp then a simple `getFileStatus(pattern)`
+   * is called to get the status of that path.
+   * If the path/pattern does not match anything in the filesystem,
+   * an empty sequence is returned.
+   * @param pattern pattern
+   * @return a possibly empty array of FileStatus entries
+   */
+  def globToFileStatus(pattern: Path): Array[FileStatus] = {
--- End diff --

I'm concerned that this method is necessarily ambiguous, because you can't 
actually distinguish globs from other paths. Is this really needed? that's why 
the FS API exposes two methods.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-24 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r76116321
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,13 +644,39 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
-
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
-
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ + A simple directory can be supplied (`hdfs://namenode:8040/logs/`). 
All files directly
+   underneath this path will be processed as they are discovered.
+ + A regular expression can be supplied instead, such as
+   `s3a://bucket/logs/[2015,2016]-??-??-friday`.
--- End diff --

Oh, is this not a regular expression? I'd change the doc then to not 
describe it as one. It sounds like it's following some other kind of glob 
syntax.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-24 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r76110009
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,13 +644,39 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
-
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
-
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ + A simple directory can be supplied (`hdfs://namenode:8040/logs/`). 
All files directly
+   underneath this path will be processed as they are discovered.
+ + A regular expression can be supplied instead, such as
+   `s3a://bucket/logs/[2015,2016]-??-??-friday`.
--- End diff --

Round () or {} brackets? Because the wildcard pattern used in 
`isGlobPath()` is `"{}[]*?\\"`. Curly only.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-24 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r76106355
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,13 +644,39 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
-
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
-
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ + A simple directory can be supplied (`hdfs://namenode:8040/logs/`). 
All files directly
+   underneath this path will be processed as they are discovered.
+ + A regular expression can be supplied instead, such as
+   `s3a://bucket/logs/[2015,2016]-??-??-friday`.
--- End diff --

You'll need (2015|2016) rather than (2015,2016). Also this is going to 
match zero or more hyphens followed by "-friday". I think you mean ".." or 
".{2}" or at least ".+" instead of "*" if this is a regex.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-24 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r76105141
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,13 +644,39 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
-
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
-
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ + A simple directory can be supplied (`hdfs://namenode:8040/logs/`). 
All files directly
+   underneath this path will be processed as they are discovered.
+ + A regular expression can be supplied instead, such as
+   `s3a://bucket/logs/[2015,2016]-??-??-friday`.
--- End diff --

going with `s3a://bucket/logs/(2015,2016)-*-friday`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r75946809
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,13 +644,39 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
-
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
-
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ + A simple directory can be supplied (`hdfs://namenode:8040/logs/`). 
All files directly
+   underneath this path will be processed as they are discovered.
+ + A regular expression can be supplied instead, such as
+   `s3a://bucket/logs/[2015,2016]-??-??-friday`.
--- End diff --

`(2015|2016)` would be what means "the string '2015' or '2016'", and `..` 
would mean "any two characters"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r75945926
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -196,29 +192,33 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, 
V]](
   logDebug(s"Getting new files for time $currentTime, " +
 s"ignoring files older than $modTimeIgnoreThreshold")
 
-  val newFileFilter = new PathFilter {
-def accept(path: Path): Boolean = isNewFile(path, currentTime, 
modTimeIgnoreThreshold)
-  }
-  val directoryFilter = new PathFilter {
-override def accept(path: Path): Boolean = 
fs.getFileStatus(path).isDirectory
+  val directories = if (SparkHadoopUtil.get.isGlobPath(directoryPath)) 
{
+fs.globStatus(directoryPath)
+.filter(_.isDirectory)
+.map(_.getPath)
+  } else {
+List(directoryPath).toArray
--- End diff --

yes, but obsolete right now with the refactoring in progress


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r75945790
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,13 +644,39 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
-
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
-
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ + A simple directory can be supplied (`hdfs://namenode:8040/logs/`). 
All files directly
+   underneath this path will be processed as they are discovered.
+ + A regular expression can be supplied instead, such as
+   `s3a://bucket/logs/[2015,2016]-??-??-friday`.
--- End diff --

wasn't it? My mistake. I wanted to show something fairly complex.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r75944093
  
--- Diff: docs/streaming-programming-guide.md ---
@@ -644,13 +644,39 @@ methods for creating DStreams from files as input 
sources.
 
 
 
-   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory (files written in nested directories not 
supported). Note that
-
- + The files must have the same data format.
- + The files must be created in the `dataDirectory` by atomically 
*moving* or *renaming* them into
- the data directory.
- + Once moved, the files must not be changed. So if the files are 
being continuously appended, the new data will not be read.
-
+   Spark Streaming will monitor the directory `dataDirectory` and process 
any files created in that directory.
+
+ + A simple directory can be supplied (`hdfs://namenode:8040/logs/`). 
All files directly
+   underneath this path will be processed as they are discovered.
+ + A regular expression can be supplied instead, such as
+   `s3a://bucket/logs/[2015,2016]-??-??-friday`.
--- End diff --

This isn't a regular expression though, what was the intent here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r75944182
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -196,29 +192,33 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, 
V]](
   logDebug(s"Getting new files for time $currentTime, " +
 s"ignoring files older than $modTimeIgnoreThreshold")
 
-  val newFileFilter = new PathFilter {
-def accept(path: Path): Boolean = isNewFile(path, currentTime, 
modTimeIgnoreThreshold)
-  }
-  val directoryFilter = new PathFilter {
-override def accept(path: Path): Boolean = 
fs.getFileStatus(path).isDirectory
+  val directories = if (SparkHadoopUtil.get.isGlobPath(directoryPath)) 
{
+fs.globStatus(directoryPath)
+.filter(_.isDirectory)
+.map(_.getPath)
+  } else {
+List(directoryPath).toArray
--- End diff --

Just `Array(directoryPath)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-20 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r75584026
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -293,8 +290,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
   }
 
   /** Get file mod time from cache or fetch it from the file system */
-  private def getFileModTime(path: Path) = {
-fileToModTime.getOrElseUpdate(path.toString, 
fs.getFileStatus(path).getModificationTime())
+  private def getFileModTime(fs: FileStatus) = {
--- End diff --

yes, I was just being minimal about the changes. Inlining is easy


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-20 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r75584030
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -241,16 +233,21 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, 
V]](
*   The files with mod time T+5 are not remembered and cannot be 
ignored (since, t+5 > t+1).
*   Hence they can get selected as new files again. To prevent this, 
files whose mod time is more
*   than current batch time are not considered.
+   * @param fs file status
+   * @param currentTime time of the batch
+   * @param modTimeIgnoreThreshold the ignore threshold
+   * @return true if the file has been modified within the batch window
*/
-  private def isNewFile(path: Path, currentTime: Long, 
modTimeIgnoreThreshold: Long): Boolean = {
+ private def isNewFile(fs: FileStatus, currentTime: Long, 
modTimeIgnoreThreshold: Long): Boolean = {
--- End diff --

I'll fix this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-20 Thread petermaxlee
Github user petermaxlee commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r75583457
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -293,8 +290,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
   }
 
   /** Get file mod time from cache or fetch it from the file system */
-  private def getFileModTime(path: Path) = {
-fileToModTime.getOrElseUpdate(path.toString, 
fs.getFileStatus(path).getModificationTime())
+  private def getFileModTime(fs: FileStatus) = {
--- End diff --

should we just remove this function now?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-20 Thread petermaxlee
Github user petermaxlee commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r75583446
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -241,16 +233,21 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, 
V]](
*   The files with mod time T+5 are not remembered and cannot be 
ignored (since, t+5 > t+1).
*   Hence they can get selected as new files again. To prevent this, 
files whose mod time is more
*   than current batch time are not considered.
+   * @param fs file status
+   * @param currentTime time of the batch
+   * @param modTimeIgnoreThreshold the ignore threshold
+   * @return true if the file has been modified within the batch window
*/
-  private def isNewFile(path: Path, currentTime: Long, 
modTimeIgnoreThreshold: Long): Boolean = {
+ private def isNewFile(fs: FileStatus, currentTime: Long, 
modTimeIgnoreThreshold: Long): Boolean = {
--- End diff --

also fs is pretty confusing, because in this context it is often used to 
refer to as FileSystem. We should pick a different word.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-20 Thread petermaxlee
Github user petermaxlee commented on a diff in the pull request:

https://github.com/apache/spark/pull/14731#discussion_r75583436
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -241,16 +233,21 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, 
V]](
*   The files with mod time T+5 are not remembered and cannot be 
ignored (since, t+5 > t+1).
*   Hence they can get selected as new files again. To prevent this, 
files whose mod time is more
*   than current batch time are not considered.
+   * @param fs file status
+   * @param currentTime time of the batch
+   * @param modTimeIgnoreThreshold the ignore threshold
+   * @return true if the file has been modified within the batch window
*/
-  private def isNewFile(path: Path, currentTime: Long, 
modTimeIgnoreThreshold: Long): Boolean = {
+ private def isNewFile(fs: FileStatus, currentTime: Long, 
modTimeIgnoreThreshold: Long): Boolean = {
--- End diff --

indent is wrong here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14731: [SPARK-17159] [streaming]: optimise check for new...

2016-08-20 Thread steveloughran
GitHub user steveloughran opened a pull request:

https://github.com/apache/spark/pull/14731

[SPARK-17159] [streaming]: optimise check for new files in FileInputDStream

## What changes were proposed in this pull request?

This PR optimises the filesystem metadata reads in `FileInputDStream`, by 
moving the filters used in `FileSystem.globStatus` and `FileSystem.listStatus` 
into filtering of the `FileStatus` instances returned in the results, so 
avoiding the need to create `FileStatus` instances within the `FileSystem` 
operation.

* This doesn't add overhead to the filtering process; that's done as 
post-processing in the`FileSystem` glob/list operations anyway.
* At worst it may result in larger lists being built up and returned.
* For every glob match of a file, the code saves 1 RPC calls to the HDFS 
NN; 1 GET against S3
* For every glob match of a directory, the code the code saves 1 RPC call 
and 2-3 HTTP calls to S3 for the directory check (including a slow List call 
whenever the directory has children as it doesn't exist as a blob any more)
* for the modtime check of every file, it saves a Hadoop RPC call, against 
all object stores *which don't implement any client-side cache*, an HTTP GET. 
* By entirely eliminating all `getFileStatus()` calls in the listed files, 
it should reduce the risk of AWS S3 throttling the HTTP request, as it does 
when too many requests are made to parts of a single S3 bucket.

## How was this patch tested?

Running the spark streaming tests as a regression suite. In the SPARK-7481 
cloud code, I could add a test against S3 which prints to stdout the exact 
number of HTTP requests made to S3 before and after the patch, so as to 
validate speedup. (the S3A metrics in Hadoop 2.8+ are accessible at the API 
level, but as they are only accessible in a new API added in 2.8; it'd stop 
that proposed module building against Hadoop 2.7. Logging and manual assessment 
is the only cross-version strategy.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/steveloughran/spark 
cloud/SPARK-17159-listfiles

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/14731.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #14731


commit 738c51bb57f331c58a877aa20aa5e2beb1084114
Author: Steve Loughran 
Date:   2016-08-20T10:50:34Z

SPARK-17159: move filtering of directories and files out of glob/list 
filters and into filtering of the FileStatus instances returned in the results, 
so avoiding the need to create FileStatus intances for

-This doesn't add overhead to the filtering process; that's done as 
post-processing in FileSystem anyway. At worst it may result in larger lists 
being built up and returned.
-For every glob match, the code saves 2 RPC calls to the HDFS NN
-The code saves 1-3 HTTP calls to S3 for the directory check (including a 
slow List call whenever the directory has children as it doesn't exist as a 
blob any more)
-for the modtime check of every file, it saves an HTTP GET

The whole modtime cache can be eliminated; it's a performance optimisation 
to avoid the overhead of the file checks, one that is no longer needed.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org