[GitHub] spark pull request #17745: [SPARK-17159][Streaming] optimise check for new f...

2018-08-26 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/17745#discussion_r212822877
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, 
V]](
   logDebug(s"Getting new files for time $currentTime, " +
 s"ignoring files older than $modTimeIgnoreThreshold")
 
-  val newFileFilter = new PathFilter {
-def accept(path: Path): Boolean = isNewFile(path, currentTime, 
modTimeIgnoreThreshold)
-  }
-  val directoryFilter = new PathFilter {
-override def accept(path: Path): Boolean = 
fs.getFileStatus(path).isDirectory
-  }
-  val directories = fs.globStatus(directoryPath, 
directoryFilter).map(_.getPath)
+  val directories = 
Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
--- End diff --

Still a lot; I think we can do a new one.

Latest version of this code is 
[here](https://github.com/hortonworks-spark/cloud-integration/tree/master/spark-cloud-integration);
 I think its time to set up a module in bahir for this


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17745: [SPARK-17159][Streaming] optimise check for new f...

2018-08-24 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/17745#discussion_r212571984
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, 
V]](
   logDebug(s"Getting new files for time $currentTime, " +
 s"ignoring files older than $modTimeIgnoreThreshold")
 
-  val newFileFilter = new PathFilter {
-def accept(path: Path): Boolean = isNewFile(path, currentTime, 
modTimeIgnoreThreshold)
-  }
-  val directoryFilter = new PathFilter {
-override def accept(path: Path): Boolean = 
fs.getFileStatus(path).isDirectory
-  }
-  val directories = fs.globStatus(directoryPath, 
directoryFilter).map(_.getPath)
+  val directories = 
Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
--- End diff --

Yes, having an object store specific version of glob, will be broadly 
helpful. In the mean time, this patch seems to be saving a lot of http requests.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17745: [SPARK-17159][Streaming] optimise check for new f...

2018-08-23 Thread steveloughran
Github user steveloughran commented on a diff in the pull request:

https://github.com/apache/spark/pull/17745#discussion_r212391371
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, 
V]](
   logDebug(s"Getting new files for time $currentTime, " +
 s"ignoring files older than $modTimeIgnoreThreshold")
 
-  val newFileFilter = new PathFilter {
-def accept(path: Path): Boolean = isNewFile(path, currentTime, 
modTimeIgnoreThreshold)
-  }
-  val directoryFilter = new PathFilter {
-override def accept(path: Path): Boolean = 
fs.getFileStatus(path).isDirectory
-  }
-  val directories = fs.globStatus(directoryPath, 
directoryFilter).map(_.getPath)
+  val directories = 
Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
--- End diff --

globStatus is flawed; key limit is that it does a tree walk. It needs to be 
replaced with an object-store-list specific one. See 
[HADOOP-13371](https://issues.apache.org/jira/browse/HADOOP-13371).

The issue with implementing an s3a flat-list and filter is that if the 
wildcard is a few entries up from the child path and there are lots of 
children, e..g

```
s3a://bucket/data/year=201?/month=*/day=*/
```

then if there are many files under year/month/day entries, all get listed 
during the filter. 

What I think would need to be done is to be able to config the FS to limit 
the depth of where it switches to bulk listing; so here I could say "depth=2", 
and so the year=? would be done via globbing, but the month= and day= would be 
better.

Or maybe just start with making the whole thing optional, and let the 
caller deal with it.

Anyway, options here

* fix the Hadoop side call. Nice and broadly useful
* see if spark can be moved off the globStatus call. Will change matching.  
But if you provide a new "cloudstore" connector, that could be done, couldn't 
it?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17745: [SPARK-17159][Streaming] optimise check for new f...

2018-08-23 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/17745#discussion_r212267619
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, 
V]](
   logDebug(s"Getting new files for time $currentTime, " +
 s"ignoring files older than $modTimeIgnoreThreshold")
 
-  val newFileFilter = new PathFilter {
-def accept(path: Path): Boolean = isNewFile(path, currentTime, 
modTimeIgnoreThreshold)
-  }
-  val directoryFilter = new PathFilter {
-override def accept(path: Path): Boolean = 
fs.getFileStatus(path).isDirectory
-  }
-  val directories = fs.globStatus(directoryPath, 
directoryFilter).map(_.getPath)
+  val directories = 
Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
--- End diff --

So, on looking at the code of glob status, it does filter at the end, so 
doing something like above might just be ok. 

Also globStatus does a listStatus() per child directory or a 
getFileStatus() in case input pattern is not a glob, each call to listStatus 
does 3+ http calls and each call to getFileStatus does 2 http calls.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17745: [SPARK-17159][Streaming] optimise check for new f...

2018-08-23 Thread ScrapCodes
Github user ScrapCodes commented on a diff in the pull request:

https://github.com/apache/spark/pull/17745#discussion_r212251757
  
--- Diff: 
streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 ---
@@ -196,29 +191,29 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, 
V]](
   logDebug(s"Getting new files for time $currentTime, " +
 s"ignoring files older than $modTimeIgnoreThreshold")
 
-  val newFileFilter = new PathFilter {
-def accept(path: Path): Boolean = isNewFile(path, currentTime, 
modTimeIgnoreThreshold)
-  }
-  val directoryFilter = new PathFilter {
-override def accept(path: Path): Boolean = 
fs.getFileStatus(path).isDirectory
-  }
-  val directories = fs.globStatus(directoryPath, 
directoryFilter).map(_.getPath)
+  val directories = 
Option(fs.globStatus(directoryPath)).getOrElse(Array.empty[FileStatus])
--- End diff --

In this approach, we might be fetching a very large list of files and then 
filtering through the directories. If the fetched, list is too large, then it 
can be a problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17745: [SPARK-17159][Streaming] optimise check for new f...

2017-09-28 Thread steveloughran
Github user steveloughran closed the pull request at:

https://github.com/apache/spark/pull/17745


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17745: [SPARK-17159][Streaming] optimise check for new f...

2017-04-24 Thread steveloughran
GitHub user steveloughran opened a pull request:

https://github.com/apache/spark/pull/17745

[SPARK-17159][Streaming] optimise check for new files in FileInputDStream

## What changes were proposed in this pull request?

Changes to `FileInputDStream` to eliminate multiple `getFileStatus()` calls 
when scanning directories for new files.

This is a minor optimisation when working with filesystems, but significant 
when working with object stores, as it eliminates HTTP requests per source file 
scanning the system. The current cost is 1-3 probing to see if a path is a 
directory or not, one more to actually timestamp a file. The new patch gets the 
file status and retains it through all the operations, so does not need to 
re-evaluate it. 

The impact of this optimisation is 3 HTTP requests per source directory and 
1 per file, for every single directory in the scan list, and for every file in 
the scanned directories, irrespective of the age of the directories. At 100+mS 
per HEAD request against S3, the speedup is significant, even when there are 
few files in the scanned directories.

 Before

1. Two separate list operations, `globStatus()` to find directories, then 
`listStatus()` to scan for new files under directories.
1.  The path filter in the `globStatus()` operations calls 
`getFileStatus(filename)` to probe for a file being a directory;
1. `getFileStatus()` is also used in the `listStatus()` call to check the 
timestamp. 

Against an object store `getFileStatus()` can cost 1-4 HTTPS requests per 
call (HEAD path, HEAD path + "/", LIST path), 

As both list operations return an array or iterator of `FileStatus` 
objects, the operations are utterly superfluous. Instead the filtering can take
place after the listing has returned.

 After

1. The output of `globStatus()` is filtered to select only directories.
1. The output of `listStatus()` is filtered by timestamp.
1. The special failure case of `globStatus()`: no path, is handled 
specially in the warning text by saying "No Directory to scan", and omitting 
the full stack trace.
1. The `fileToModTime` map is superflous, and so deleted.

## How was this patch tested?

1. There is a new test in `org.apache.spark.streaming.InputStreamsSuite`
1. I have object store integration tests in an external repository, which 
have been used to verify functionality and that the number of HTTP requests is 
reduced when invoked against S3A endpoints.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/steveloughran/spark 
cloud/SPARK-17159-listfiles-minimal

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/17745.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #17745


commit f3ffe1db2e5edc9b6a60fb48b34b3099853e4324
Author: Steve Loughran 
Date:   2017-04-24T13:04:04Z

SPARK-17159 minimal patch of hchanges to FileInputDStream to reduce File 
status requests when querying files. This is a minor optimisation when working 
with filesystems, but significant when working with object stores.

Change-Id: I269d98902f615818941c88de93a124c65453756e




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org