[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/17120 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17120#discussion_r105097597 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -79,9 +81,16 @@ class FileStreamSource( sourceOptions.maxFileAgeMs } + private val fileNameOnly = sourceOptions.fileNameOnly + if (fileNameOnly) { +logWarning("'fileNameOnly' is enabled. Make sure your file names are unique (e.g. using " + + "UUID), otherwise, files using the same name will be considered as the same file and causes" + + " data lost") --- End diff -- fixed -- thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/17120#discussion_r105094841 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -79,9 +81,16 @@ class FileStreamSource( sourceOptions.maxFileAgeMs } + private val fileNameOnly = sourceOptions.fileNameOnly + if (fileNameOnly) { +logWarning("'fileNameOnly' is enabled. Make sure your file names are unique (e.g. using " + + "UUID), otherwise, files using the same name will be considered as the same file and causes" + + " data lost") --- End diff -- nit: if I may, this message sounds a bit odd. > files using the same name will be considered as the same file and causes data lost could we say `files with the same name but under different paths will be considered the same and causes data lost` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17120#discussion_r105080626 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -309,6 +315,10 @@ object FileStreamSource { def size: Int = map.size() +/** + * Note when `fileNameOnly` is true, each entry would be (file name, timestamp) rather than + * (full path, timestamp). + */ def allEntries: Seq[(String, Timestamp)] = { --- End diff -- deleted :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17120#discussion_r105080572 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -75,7 +77,7 @@ class FileStreamSource( /** A mapping from a file that we have processed to some timestamp it was last modified. */ // Visible for testing and debugging in production. - val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs) + val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs, sourceOptions.fileNameOnly) --- End diff -- added. thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17120#discussion_r105047840 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -75,7 +77,7 @@ class FileStreamSource( /** A mapping from a file that we have processed to some timestamp it was last modified. */ // Visible for testing and debugging in production. - val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs) + val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs, sourceOptions.fileNameOnly) --- End diff -- It's better to add a warning when `fileNameOnly` is true. How about ``` logWarning("fileNameOnly is enabled. Make user your file names are unique (e.g., using UUID), otherwise, files using the same name will be considered as the same file and causes data lost") ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/17120#discussion_r105048802 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala --- @@ -309,6 +315,10 @@ object FileStreamSource { def size: Int = map.size() +/** + * Note when `fileNameOnly` is true, each entry would be (file name, timestamp) rather than + * (full path, timestamp). + */ def allEntries: Seq[(String, Timestamp)] = { --- End diff -- This method is not used. Could you just delete it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17120#discussion_r104287612 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -1253,8 +1253,26 @@ class FileStreamSourceSuite extends FileStreamSourceTest { assert(map.isNewFile("e", 20)) } + test("SeenFilesMap with fileNameOnly = true") { +val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = true) + +map.add("file:///a/b/c/d", 5) +map.add("file:///a/b/c/e", 5) +assert(map.size == 2) --- End diff -- ah, thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
Github user steveloughran commented on a diff in the pull request: https://github.com/apache/spark/pull/17120#discussion_r104286483 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala --- @@ -1253,8 +1253,26 @@ class FileStreamSourceSuite extends FileStreamSourceTest { assert(map.isNewFile("e", 20)) } + test("SeenFilesMap with fileNameOnly = true") { +val map = new SeenFilesMap(maxAgeMs = 10, fileNameOnly = true) + +map.add("file:///a/b/c/d", 5) +map.add("file:///a/b/c/e", 5) +assert(map.size == 2) --- End diff -- recommend `===` for better error reporting --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17120#discussion_r104276335 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1052,10 +1052,18 @@ Here are the details of all the sinks in Spark. Append path: path to the output directory, must be specified. + maxFilesPerTrigger: maximum number of new files to be considered in every trigger (default: no max) -latestFirst: whether to processs the latest new files first, useful when there is a large backlog of files(default: false) - +latestFirst: whether to processs the latest new files first, useful when there is a large backlog of files (default: false) + +fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same: + +· "file:///dataset.txt" +· "s3://a/dataset.txt" +· "s3n://a/b/dataset.txt" +· "s3a://a/b/c/dataset.txt" --- End diff -- the incidents of a `` does not look pretty, so I'm using a dot here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
Github user lw-lin commented on a diff in the pull request: https://github.com/apache/spark/pull/17120#discussion_r104276118 --- Diff: docs/structured-streaming-programming-guide.md --- @@ -1052,10 +1052,18 @@ Here are the details of all the sinks in Spark. Append path: path to the output directory, must be specified. + maxFilesPerTrigger: maximum number of new files to be considered in every trigger (default: no max) latestFirst: whether to processs the latest new files first, useful when there is a large backlog of files(default: false) - + +fileNameOnly: whether to check new files based on only the filename instead of on the full path. With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same: + +· "file:///dataset.txt" +· "s3://a/dataset.txt" +· "s3n://a/b/dataset.txt" +· "s3a://a/b/c/dataset.txt" --- End diff -- the incidents of a `` does not look pretty, so I'm using a dot here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17120: [SPARK-19715][Structured Streaming] Option to Str...
GitHub user lw-lin opened a pull request: https://github.com/apache/spark/pull/17120 [SPARK-19715][Structured Streaming] Option to Strip Paths in FileSource ## What changes were proposed in this pull request? Today, we compare the whole path when deciding if a file is new in the FileSource for structured streaming. However, this cause cause false negatives in the case where the path has changed in a cosmetic way (i.e. changing `s3n` to `s3a`). This patch adds an option `fileNameOnly` that causes the new file check to be based only on the filename (but still store the whole path in the log). ## Usage ```scala spark .readStream .option("fileNameOnly", true) .text("s3n://bucket/dir1/dir2") .writeStream ... ``` ## How was this patch tested? Added a test case You can merge this pull request into a Git repository by running: $ git pull https://github.com/lw-lin/spark filename-only Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/17120.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #17120 commit aeb10d100a24ca644745fb8b26985b584fd5118e Author: Liwei Lin Date: 2017-02-28T15:29:17Z Add support for `fileNameOnly` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org