spark git commit: [SPARK-18826][SS] Add 'latestFirst' option to FileStreamSource
Repository: spark Updated Branches: refs/heads/master 4f7292c87 -> 68a6dc974 [SPARK-18826][SS] Add 'latestFirst' option to FileStreamSource ## What changes were proposed in this pull request? When starting a stream with a lot of backfill and maxFilesPerTrigger, the user could often want to start with most recent files first. This would let you keep low latency for recent data and slowly backfill historical data. This PR adds a new option `latestFirst` to control this behavior. When it's true, `FileStreamSource` will sort the files by the modified time from latest to oldest, and take the first `maxFilesPerTrigger` files as a new batch. ## How was this patch tested? The added test. Author: Shixiong Zhu Closes #16251 from zsxwing/newest-first. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68a6dc97 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68a6dc97 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68a6dc97 Branch: refs/heads/master Commit: 68a6dc974b25e6eddef109f6fd23ae4e9775ceca Parents: 4f7292c Author: Shixiong Zhu Authored: Thu Dec 15 13:17:51 2016 -0800 Committer: Tathagata Das Committed: Thu Dec 15 13:17:51 2016 -0800 -- .../execution/streaming/FileStreamOptions.scala | 14 ++ .../execution/streaming/FileStreamSource.scala | 11 - .../sql/streaming/FileStreamSourceSuite.scala | 47 3 files changed, 71 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/68a6dc97/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index fdea65c..25ebe17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -53,4 +53,18 @@ class FileStreamOptions(parameters: CaseInsensitiveMap) extends Logging { /** Options as specified by the user, in a case-insensitive map, without "path" set. */ val optionMapWithoutPath: Map[String, String] = parameters.filterKeys(_ != "path") + + /** + * Whether to scan latest files first. If it's true, when the source finds unprocessed files in a + * trigger, it will first process the latest files. + */ + val latestFirst: Boolean = parameters.get("latestFirst").map { str => +try { + str.toBoolean +} catch { + case _: IllegalArgumentException => +throw new IllegalArgumentException( + s"Invalid value '$str' for option 'latestFirst', must be 'true' or 'false'") +} + }.getOrElse(false) } http://git-wip-us.apache.org/repos/asf/spark/blob/68a6dc97/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 20e0dce..39c0b49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -62,6 +62,15 @@ class FileStreamSource( /** Maximum number of new files to be considered in each batch */ private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger + private val fileSortOrder = if (sourceOptions.latestFirst) { + logWarning( +"""'latestFirst' is true. New files will be processed first. + |It may affect the watermark value""".stripMargin) + implicitly[Ordering[Long]].reverse +} else { + implicitly[Ordering[Long]] +} + /** A mapping from a file that we have processed to some timestamp it was last modified. */ // Visible for testing and debugging in production. val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs) @@ -155,7 +164,7 @@ class FileStreamSource( val startTime = System.nanoTime val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType)) -val files = catalog.allFiles().sortBy(_.getModificationTime).map { status => +val files = catalog.allFiles().sortBy(_.getModificationTime)(fileSortOrder).map { status => (status.getPath.toUri.toString, status.getModificationTime) } val endTime = System.nanoTime http://git-wip-us.apache.org/repos/asf/sp
spark git commit: [SPARK-18826][SS] Add 'latestFirst' option to FileStreamSource
Repository: spark Updated Branches: refs/heads/branch-2.1 e430915fa -> 900ce558a [SPARK-18826][SS] Add 'latestFirst' option to FileStreamSource ## What changes were proposed in this pull request? When starting a stream with a lot of backfill and maxFilesPerTrigger, the user could often want to start with most recent files first. This would let you keep low latency for recent data and slowly backfill historical data. This PR adds a new option `latestFirst` to control this behavior. When it's true, `FileStreamSource` will sort the files by the modified time from latest to oldest, and take the first `maxFilesPerTrigger` files as a new batch. ## How was this patch tested? The added test. Author: Shixiong Zhu Closes #16251 from zsxwing/newest-first. (cherry picked from commit 68a6dc974b25e6eddef109f6fd23ae4e9775ceca) Signed-off-by: Tathagata Das Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/900ce558 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/900ce558 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/900ce558 Branch: refs/heads/branch-2.1 Commit: 900ce558a238fb9d8220527d8313646fe6830695 Parents: e430915 Author: Shixiong Zhu Authored: Thu Dec 15 13:17:51 2016 -0800 Committer: Tathagata Das Committed: Thu Dec 15 13:18:06 2016 -0800 -- .../execution/streaming/FileStreamOptions.scala | 14 ++ .../execution/streaming/FileStreamSource.scala | 11 - .../sql/streaming/FileStreamSourceSuite.scala | 47 3 files changed, 71 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/900ce558/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala index fdea65c..25ebe17 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala @@ -53,4 +53,18 @@ class FileStreamOptions(parameters: CaseInsensitiveMap) extends Logging { /** Options as specified by the user, in a case-insensitive map, without "path" set. */ val optionMapWithoutPath: Map[String, String] = parameters.filterKeys(_ != "path") + + /** + * Whether to scan latest files first. If it's true, when the source finds unprocessed files in a + * trigger, it will first process the latest files. + */ + val latestFirst: Boolean = parameters.get("latestFirst").map { str => +try { + str.toBoolean +} catch { + case _: IllegalArgumentException => +throw new IllegalArgumentException( + s"Invalid value '$str' for option 'latestFirst', must be 'true' or 'false'") +} + }.getOrElse(false) } http://git-wip-us.apache.org/repos/asf/spark/blob/900ce558/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 20e0dce..39c0b49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -62,6 +62,15 @@ class FileStreamSource( /** Maximum number of new files to be considered in each batch */ private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger + private val fileSortOrder = if (sourceOptions.latestFirst) { + logWarning( +"""'latestFirst' is true. New files will be processed first. + |It may affect the watermark value""".stripMargin) + implicitly[Ordering[Long]].reverse +} else { + implicitly[Ordering[Long]] +} + /** A mapping from a file that we have processed to some timestamp it was last modified. */ // Visible for testing and debugging in production. val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs) @@ -155,7 +164,7 @@ class FileStreamSource( val startTime = System.nanoTime val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath) val catalog = new InMemoryFileIndex(sparkSession, globbedPaths, options, Some(new StructType)) -val files = catalog.allFiles().sortBy(_.getModificationTime).map { status => +val files = catalog.allFiles().sortBy(_.getModificationTime)(fileSortOrder).map { status => (status.getPath.toUri.toString, stat