[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r358624511 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +341,96 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] trait FileStreamSourceCleaner { +def clean(entry: FileEntry): Unit + } + + private[sql] object FileStreamSourceCleaner { +def apply( +fileSystem: FileSystem, +sourcePath: Path, +option: FileStreamOptions, +hadoopConf: Configuration): Option[FileStreamSourceCleaner] = option.cleanSource match { + case CleanSourceMode.ARCHIVE => +require(option.sourceArchiveDir.isDefined) +val path = new Path(option.sourceArchiveDir.get) +val archiveFs = path.getFileSystem(hadoopConf) +val qualifiedArchivePath = archiveFs.makeQualified(path) +Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, qualifiedArchivePath)) + + case CleanSourceMode.DELETE => +Some(new SourceFileRemover(fileSystem)) + + case _ => None +} + } + + private[sql] class SourceFileArchiver( + fileSystem: FileSystem, + sourcePath: Path, + baseArchiveFileSystem: FileSystem, + baseArchivePath: Path) extends FileStreamSourceCleaner with Logging { +assertParameters() + +private def assertParameters(): Unit = { + require(fileSystem.getUri == baseArchiveFileSystem.getUri, "Base archive path is located " + +s"on a different file system than the source files. source path: $sourcePath" + +s" / base archive path: $baseArchivePath") + + /** + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path Review comment: FYI, just filed https://issues.apache.org/jira/browse/SPARK-30281 and raised a patch with picking the option 2. #26920 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r358624511 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +341,96 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] trait FileStreamSourceCleaner { +def clean(entry: FileEntry): Unit + } + + private[sql] object FileStreamSourceCleaner { +def apply( +fileSystem: FileSystem, +sourcePath: Path, +option: FileStreamOptions, +hadoopConf: Configuration): Option[FileStreamSourceCleaner] = option.cleanSource match { + case CleanSourceMode.ARCHIVE => +require(option.sourceArchiveDir.isDefined) +val path = new Path(option.sourceArchiveDir.get) +val archiveFs = path.getFileSystem(hadoopConf) +val qualifiedArchivePath = archiveFs.makeQualified(path) +Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, qualifiedArchivePath)) + + case CleanSourceMode.DELETE => +Some(new SourceFileRemover(fileSystem)) + + case _ => None +} + } + + private[sql] class SourceFileArchiver( + fileSystem: FileSystem, + sourcePath: Path, + baseArchiveFileSystem: FileSystem, + baseArchivePath: Path) extends FileStreamSourceCleaner with Logging { +assertParameters() + +private def assertParameters(): Unit = { + require(fileSystem.getUri == baseArchiveFileSystem.getUri, "Base archive path is located " + +s"on a different file system than the source files. source path: $sourcePath" + +s" / base archive path: $baseArchivePath") + + /** + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path Review comment: FYI, just filed https://issues.apache.org/jira/browse/SPARK-30281 and raised a patch with picking the option 2. #26845 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r347673657 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +341,96 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] trait FileStreamSourceCleaner { +def clean(entry: FileEntry): Unit + } + + private[sql] object FileStreamSourceCleaner { +def apply( +fileSystem: FileSystem, +sourcePath: Path, +option: FileStreamOptions, +hadoopConf: Configuration): Option[FileStreamSourceCleaner] = option.cleanSource match { + case CleanSourceMode.ARCHIVE => +require(option.sourceArchiveDir.isDefined) +val path = new Path(option.sourceArchiveDir.get) +val archiveFs = path.getFileSystem(hadoopConf) +val qualifiedArchivePath = archiveFs.makeQualified(path) +Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, qualifiedArchivePath)) + + case CleanSourceMode.DELETE => +Some(new SourceFileRemover(fileSystem)) + + case _ => None +} + } + + private[sql] class SourceFileArchiver( + fileSystem: FileSystem, + sourcePath: Path, + baseArchiveFileSystem: FileSystem, + baseArchivePath: Path) extends FileStreamSourceCleaner with Logging { +assertParameters() + +private def assertParameters(): Unit = { + require(fileSystem.getUri == baseArchiveFileSystem.getUri, "Base archive path is located " + +s"on a different file system than the source files. source path: $sourcePath" + +s" / base archive path: $baseArchivePath") + + /** + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path Review comment: @zsxwing Thanks for spending your time to revisit this! The condition is based on the test suite in FileStreamSource, but for partitioned paths, yes that's missed. Nice finding. I need to update the condition, or just remove the condition documented there at all. For `recursiveFileLookup`, it came later than the patch and I missed it. The condition was formed in early this year, and recursiveFileLookup seemed to come in mid this year. Adding two cases, FileStreamSource can read any files under the source path, which invalidates the depth check. There're three options to deal with this: 1) No pattern check and just try to rename. Log it if it fails to rename. (Caution! It doesn't prevent archived file to be added to source file again in different directory.) 2) Disallow any path to be used as base archive path if the path matches the source path (glob) - here "disallow" means fail the query. After then we don't need to check the pattern. If end users provide complicated glob path as source path, they also may be puzzled how to not match, but not sure they would really want to set the path be complicated in production. 3) Do pattern check before renaming, though it needs checking pattern per file. We may optimize this a bit via grouping files per directory and check the pattern with directory instead of individual files. It doesn't fail the query so end users need to check whether the files are not cleaned up due to the pattern check. Which one (or couple of) would be the preferred approach? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r347673657 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +341,96 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] trait FileStreamSourceCleaner { +def clean(entry: FileEntry): Unit + } + + private[sql] object FileStreamSourceCleaner { +def apply( +fileSystem: FileSystem, +sourcePath: Path, +option: FileStreamOptions, +hadoopConf: Configuration): Option[FileStreamSourceCleaner] = option.cleanSource match { + case CleanSourceMode.ARCHIVE => +require(option.sourceArchiveDir.isDefined) +val path = new Path(option.sourceArchiveDir.get) +val archiveFs = path.getFileSystem(hadoopConf) +val qualifiedArchivePath = archiveFs.makeQualified(path) +Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, qualifiedArchivePath)) + + case CleanSourceMode.DELETE => +Some(new SourceFileRemover(fileSystem)) + + case _ => None +} + } + + private[sql] class SourceFileArchiver( + fileSystem: FileSystem, + sourcePath: Path, + baseArchiveFileSystem: FileSystem, + baseArchivePath: Path) extends FileStreamSourceCleaner with Logging { +assertParameters() + +private def assertParameters(): Unit = { + require(fileSystem.getUri == baseArchiveFileSystem.getUri, "Base archive path is located " + +s"on a different file system than the source files. source path: $sourcePath" + +s" / base archive path: $baseArchivePath") + + /** + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path Review comment: @zsxwing Thanks for spending your time to revisit this! The condition is based on the test suite in FileStreamSource, but for partitioned paths, yes that's missed. Nice finding. I need to update the condition, or just remove the condition documented there at all. For `recursiveFileLookup`, it came later than the patch and I missed it. The condition was formed in early this year, and recursiveFileLookup seemed to come in mid this year. Adding two cases, FileStreamSource can read any files under the source path, which invalidates the depth check. There're three options to deal with this: 1) No pattern check and just try to rename. Log it if it fails to rename. (Caution! It doesn't prevent archived file to be added to source file again in different directory.) 2) Disallow any path to be used as base archive path if the path matches the source path (glob) - here "disallow" means fail the query. After then we don't need to check the pattern. If end users provide complicated glob path as source path, they also may be puzzled how to not match. 3) Do pattern check before renaming, though it needs checking pattern per file. We may optimize this a bit via grouping files per directory and check the pattern with directory instead of individual files. It doesn't fail the query so end users need to check whether the files are not cleaned up due to the pattern check. Which one (or couple of) would be the preferred approach? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r347673657 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +341,96 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] trait FileStreamSourceCleaner { +def clean(entry: FileEntry): Unit + } + + private[sql] object FileStreamSourceCleaner { +def apply( +fileSystem: FileSystem, +sourcePath: Path, +option: FileStreamOptions, +hadoopConf: Configuration): Option[FileStreamSourceCleaner] = option.cleanSource match { + case CleanSourceMode.ARCHIVE => +require(option.sourceArchiveDir.isDefined) +val path = new Path(option.sourceArchiveDir.get) +val archiveFs = path.getFileSystem(hadoopConf) +val qualifiedArchivePath = archiveFs.makeQualified(path) +Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, qualifiedArchivePath)) + + case CleanSourceMode.DELETE => +Some(new SourceFileRemover(fileSystem)) + + case _ => None +} + } + + private[sql] class SourceFileArchiver( + fileSystem: FileSystem, + sourcePath: Path, + baseArchiveFileSystem: FileSystem, + baseArchivePath: Path) extends FileStreamSourceCleaner with Logging { +assertParameters() + +private def assertParameters(): Unit = { + require(fileSystem.getUri == baseArchiveFileSystem.getUri, "Base archive path is located " + +s"on a different file system than the source files. source path: $sourcePath" + +s" / base archive path: $baseArchivePath") + + /** + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path Review comment: @zsxwing Thanks for spending your time to revisit this! The condition is based on the test suite in FileStreamSource, but for partitioned paths, yes that's missed. Nice finding. I need to update the condition, or just remove the condition documented there at all. For `recursiveFileLookup`, it came later than the patch and I missed it. The condition was formed in early this year, and recursiveFileLookup seemed to come in mid this year. Adding two cases, FileStreamSource can read any files under the source path, which invalidates the depth check. There're three options to deal with this: 1) No pattern check and just try to rename. Log it if it fails to rename. (Caution! It doesn't prevent archived file to be added to source file again in different directory.) 2) Disallow any path to be used as base archive path if the path matches the source path (glob) - fail the query. After then we don't need to check the pattern. If end users provide complicated glob path as source path, they also may be puzzled how to not match. 3) Do pattern check before renaming, though it needs checking pattern per file. We may optimize this a bit via grouping files per directory and check the pattern with directory instead of individual files. It doesn't fail the query so end users need to check whether the files are not cleaned up due to the pattern check. Which one (or couple of) would be the preferred approach? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r347673657 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +341,96 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] trait FileStreamSourceCleaner { +def clean(entry: FileEntry): Unit + } + + private[sql] object FileStreamSourceCleaner { +def apply( +fileSystem: FileSystem, +sourcePath: Path, +option: FileStreamOptions, +hadoopConf: Configuration): Option[FileStreamSourceCleaner] = option.cleanSource match { + case CleanSourceMode.ARCHIVE => +require(option.sourceArchiveDir.isDefined) +val path = new Path(option.sourceArchiveDir.get) +val archiveFs = path.getFileSystem(hadoopConf) +val qualifiedArchivePath = archiveFs.makeQualified(path) +Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, qualifiedArchivePath)) + + case CleanSourceMode.DELETE => +Some(new SourceFileRemover(fileSystem)) + + case _ => None +} + } + + private[sql] class SourceFileArchiver( + fileSystem: FileSystem, + sourcePath: Path, + baseArchiveFileSystem: FileSystem, + baseArchivePath: Path) extends FileStreamSourceCleaner with Logging { +assertParameters() + +private def assertParameters(): Unit = { + require(fileSystem.getUri == baseArchiveFileSystem.getUri, "Base archive path is located " + +s"on a different file system than the source files. source path: $sourcePath" + +s" / base archive path: $baseArchivePath") + + /** + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path Review comment: @zsxwing Thanks for spending your time to revisit this! The condition is based on the test suite in FileStreamSource, but for partitioned paths, yes that's missed. Nice finding. I need to update the condition, or just remove the condition documented there at all. For `recursiveFileLookup`, it came later than the patch and I missed it. The condition was formed in early this year, and recursiveFileLookup seemed to come in mid this year. Adding two cases, FileStreamSource can read any files under the source path, which invalidates the depth check. There're three options to deal with this: 1) No pattern check and just try to rename. Log it if it fails to rename. (Caution! It doesn't prevent archived file to be added to source file again in different directory.) 2) Disallow any path to be used as base archive path if the path matches the source path (glob). After then we don't need to check the pattern. 3) Do pattern check before renaming, though it needs checking pattern per file. We may optimize this a bit via grouping files per directory and check the pattern with directory instead of individual files. Which one (or couple of) would be the preferred approach? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r347673657 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +341,96 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] trait FileStreamSourceCleaner { +def clean(entry: FileEntry): Unit + } + + private[sql] object FileStreamSourceCleaner { +def apply( +fileSystem: FileSystem, +sourcePath: Path, +option: FileStreamOptions, +hadoopConf: Configuration): Option[FileStreamSourceCleaner] = option.cleanSource match { + case CleanSourceMode.ARCHIVE => +require(option.sourceArchiveDir.isDefined) +val path = new Path(option.sourceArchiveDir.get) +val archiveFs = path.getFileSystem(hadoopConf) +val qualifiedArchivePath = archiveFs.makeQualified(path) +Some(new SourceFileArchiver(fileSystem, sourcePath, archiveFs, qualifiedArchivePath)) + + case CleanSourceMode.DELETE => +Some(new SourceFileRemover(fileSystem)) + + case _ => None +} + } + + private[sql] class SourceFileArchiver( + fileSystem: FileSystem, + sourcePath: Path, + baseArchiveFileSystem: FileSystem, + baseArchivePath: Path) extends FileStreamSourceCleaner with Logging { +assertParameters() + +private def assertParameters(): Unit = { + require(fileSystem.getUri == baseArchiveFileSystem.getUri, "Base archive path is located " + +s"on a different file system than the source files. source path: $sourcePath" + +s" / base archive path: $baseArchivePath") + + /** + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path Review comment: @zsxwing Thanks for spending your time to revisit this! The condition is based on the test suite in FileStreamSource, but for partitioned paths, yes that's missed. Nice finding. I need to update the condition, or just remove the condition documented there at all. For `recursiveFileLookup`, it came later than the patch and I missed it. The condition was formed in early this year, and recursiveFileLookup seemed to come in mid this year. Adding two cases, FileStreamSource can read any files under the source path, which invalidates the depth check. There're three options to deal with this: 1) No pattern check and just try to rename. Log it if it fails to rename. 2) Disallow any path to be used as base archive path if the path matches the source path (glob). After then we don't need to check the pattern. 3) Do pattern check before renaming, though it needs checking pattern per file. We may optimize this a bit via grouping files per directory and check the pattern with directory instead of individual files. Which one (or couple of) would be the preferred approach? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r341397990 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -53,6 +55,18 @@ class FileStreamSource( fs.makeQualified(new Path(path)) // can contain glob patterns } + private val sourceCleaner: FileStreamSourceCleaner = { Review comment: Yeah, that's a good suggestion. Will address. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r341395755 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +362,77 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchiveFileSystem: Option[FileSystem], + baseArchivePath: Option[Path]) extends Logging { +assertParameters() + +private def assertParameters(): Unit = { + require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined) + + baseArchiveFileSystem.foreach { fs => +require(fileSystem.getUri == fs.getUri, "Base archive path is located on a different " + + s"file system than the source files. source path: $sourcePath" + + s" / base archive path: ${baseArchivePath.get}") + } + + baseArchivePath.foreach { path => + +/** + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path + * + * Checking with glob pattern is costly, so set this requirement to eliminate the cases + * where the archive path can be matched with source path. For example, when file is moved + * to archive directory, destination path will retain input file's path as suffix, so + * destination path can't be matched with source path if archive directory's depth is longer + * than 2, as neither file nor parent directory of destination path can be matched with + * source path. + */ +require(path.depth() > 2, "Base archive path must have a depth of at least 2 " + Review comment: So the explanation says about 2 "subdirectories", not 2 "depth". / denotes its own depth. I don't think depth is the term end users are familiar with - I'll remove the part "a depth of". This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r340311674 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +362,77 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchiveFileSystem: Option[FileSystem], + baseArchivePath: Option[Path]) extends Logging { +assertParameters() + +private def assertParameters(): Unit = { + require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined) + + baseArchiveFileSystem.foreach { fs => +require(fileSystem.getUri == fs.getUri, "Base archive path is located on a different " + + s"file system than the source files. source path: $sourcePath" + + s" / base archive path: ${baseArchivePath.get}") + } + + baseArchivePath.foreach { path => + +/** + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path + * + * Checking with glob pattern is costly, so set this requirement to eliminate the cases + * where the archive path can be matched with source path. For example, when file is moved + * to archive directory, destination path will retain input file's path as suffix, so + * destination path can't be matched with source path if archive directory's depth is longer + * than 2, as neither file nor parent directory of destination path can be matched with + * source path. + */ +require(path.depth() > 2, "Base archive path must have a depth of at least 2 " + + "subdirectories. e.g. '/data/archive'") + } +} + +def archive(entry: FileEntry): Unit = { + require(baseArchivePath.isDefined) + + val curPath = new Path(new URI(entry.path)) + val newPath = new Path(baseArchivePath.get, curPath.toUri.getPath.stripPrefix("/")) Review comment: Thanks for the quick feedback! I'll reflect it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r340308673 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +362,77 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchiveFileSystem: Option[FileSystem], + baseArchivePath: Option[Path]) extends Logging { +assertParameters() + +private def assertParameters(): Unit = { + require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined) + + baseArchiveFileSystem.foreach { fs => +require(fileSystem.getUri == fs.getUri, "Base archive path is located on a different " + + s"file system than the source files. source path: $sourcePath" + + s" / base archive path: ${baseArchivePath.get}") + } + + baseArchivePath.foreach { path => + +/** + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path + * + * Checking with glob pattern is costly, so set this requirement to eliminate the cases + * where the archive path can be matched with source path. For example, when file is moved + * to archive directory, destination path will retain input file's path as suffix, so + * destination path can't be matched with source path if archive directory's depth is longer + * than 2, as neither file nor parent directory of destination path can be matched with + * source path. + */ +require(path.depth() > 2, "Base archive path must have a depth of at least 2 " + + "subdirectories. e.g. '/data/archive'") + } +} + +def archive(entry: FileEntry): Unit = { + require(baseArchivePath.isDefined) + + val curPath = new Path(new URI(entry.path)) + val newPath = new Path(baseArchivePath.get, curPath.toUri.getPath.stripPrefix("/")) Review comment: I'm revisiting two issues and not sure there's a viable workaround. Looks like the issue pointed out was ":" isn't a valid char for HDFS but might be a valid char for other filesystems so Path API doesn't restrict it and leads problem. Even HDFS-14762 is closed as "Won't fix". Would this only occur on `Path(parent, child)` and `Path(pathstr)` is safe? Would it work if we manually concat two paths as string and pass to Path's constructor? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r339282089 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +362,139 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchiveFileSystem: Option[FileSystem], + baseArchivePath: Option[Path]) extends Logging { +require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined) + +private val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePath) + +private val sameFsSourceAndArchive: Boolean = { + baseArchiveFileSystem.exists { fs => +if (fileSystem.getUri != fs.getUri) { + logWarning("Base archive path is located to the different filesystem with source, " + +s"which is not supported. source path: ${sourcePath} / base archive path: " + +s"${baseArchivePath.get}") + false +} else { + true +} + } +} + +/** + * This is a flag to skip matching archived path with source path. + * + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path + * + * Checking with glob pattern is costly, so this flag leverages above information to prune + * the cases where the file cannot be matched with source path. For example, when file is + * moved to archive directory, destination path will retain input file's path as suffix, + * so destination path can't be matched with source path if archive directory's depth is + * longer than 2, as neither file nor parent directory of destination path can be matched + * with source path. + */ +private val skipCheckingGlob: Boolean = baseArchivePath.exists(_.depth() > 2) + +def archive(entry: FileEntry): Unit = { + require(baseArchivePath.isDefined) + + if (sameFsSourceAndArchive) { +val curPath = new Path(new URI(entry.path)) +val newPath = new Path(baseArchivePath.get, curPath.toUri.getPath.stripPrefix("/")) + +if (!skipCheckingGlob && pathMatchesSourcePattern(newPath)) { Review comment: If I were one of end users and query fails because of glob path, I might be puzzling and want to know why - that's only when we allow `depth == 2` and I think we've agreed `depth > 2` doesn't need to explain further detail. I'll apply it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r338960691 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +362,139 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchiveFileSystem: Option[FileSystem], + baseArchivePath: Option[Path]) extends Logging { +require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined) + +private val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePath) + +private val sameFsSourceAndArchive: Boolean = { + baseArchiveFileSystem.exists { fs => +if (fileSystem.getUri != fs.getUri) { + logWarning("Base archive path is located to the different filesystem with source, " + +s"which is not supported. source path: ${sourcePath} / base archive path: " + +s"${baseArchivePath.get}") + false +} else { + true +} + } +} + +/** + * This is a flag to skip matching archived path with source path. + * + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path + * + * Checking with glob pattern is costly, so this flag leverages above information to prune + * the cases where the file cannot be matched with source path. For example, when file is + * moved to archive directory, destination path will retain input file's path as suffix, + * so destination path can't be matched with source path if archive directory's depth is + * longer than 2, as neither file nor parent directory of destination path can be matched + * with source path. + */ +private val skipCheckingGlob: Boolean = baseArchivePath.exists(_.depth() > 2) + +def archive(entry: FileEntry): Unit = { + require(baseArchivePath.isDefined) + + if (sameFsSourceAndArchive) { +val curPath = new Path(new URI(entry.path)) +val newPath = new Path(baseArchivePath.get, curPath.toUri.getPath.stripPrefix("/")) + +if (!skipCheckingGlob && pathMatchesSourcePattern(newPath)) { Review comment: Technically, we just need to check pattern only when the depth of base archive path is 2. The depth of base archive path cannot be 1, because then archive file will always refer to the same path of original source file. (Yes I was aware of this but forgot to apply the fact to prune.) So `depth > 2` -> no need to check, `depth == 1` -> always matches, `depth == 2` -> need to check the pattern. We can even just simplify the case to avoid checking per file as checking the earliest dir name of base archive path matches the earlier dir name (including glob path) of source path. It won't guarantee all the archive paths will match the source path - that's why I did full pattern match per file, but if we really don't want to do pattern matching per file, yes, we can do it. What I concern about is rather not a technical one. If we want to just yell and fail the query instead of skipping archive file, we must ensure end users understand why the query is failed and how to get it fixed. I had to explain most of reviewers how pattern check works, even with that I'm feeling reviewers don't feel familiar with it - I'm unsure it would work for end users though we can add some guidance how it works. So we will need to simplify the condition as much as possible if we really want to just fail the query. The difference of necessary amount of explanation for depth > 2 and depth >=2 are significant, and the actual harm of not allowing depth == 2 is that they just need to create a subdirectory and use the subdirectory, which doesn't seem to be critical. Does it make sense? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r338847897 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +362,139 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchiveFileSystem: Option[FileSystem], + baseArchivePath: Option[Path]) extends Logging { +require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined) + +private val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePath) + +private val sameFsSourceAndArchive: Boolean = { + baseArchiveFileSystem.exists { fs => +if (fileSystem.getUri != fs.getUri) { + logWarning("Base archive path is located to the different filesystem with source, " + +s"which is not supported. source path: ${sourcePath} / base archive path: " + +s"${baseArchivePath.get}") + false +} else { + true +} + } +} + +/** + * This is a flag to skip matching archived path with source path. + * + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path + * + * Checking with glob pattern is costly, so this flag leverages above information to prune + * the cases where the file cannot be matched with source path. For example, when file is + * moved to archive directory, destination path will retain input file's path as suffix, + * so destination path can't be matched with source path if archive directory's depth is + * longer than 2, as neither file nor parent directory of destination path can be matched + * with source path. + */ +private val skipCheckingGlob: Boolean = baseArchivePath.exists(_.depth() > 2) + +def archive(entry: FileEntry): Unit = { + require(baseArchivePath.isDefined) + + if (sameFsSourceAndArchive) { +val curPath = new Path(new URI(entry.path)) +val newPath = new Path(baseArchivePath.get, curPath.toUri.getPath.stripPrefix("/")) + +if (!skipCheckingGlob && pathMatchesSourcePattern(newPath)) { Review comment: > I don't fully understand the matching you explained, but my guess is that you could end up with an archived file called "/hello/hello1/hell" and that somehow matches the pattern? Exactly. We cannot forecast if first depth of source path has wildcard. Yes we can also prune the case if the first depth of source path doesn't contain wildcard, but feels like more and more complicated. As we would really want to simplify the case, how about just forcing baseArchivePath to have depth more than 2? (I feel it would be harmless in production - if they're encountering the case, just create subdirectory.) Then we can eliminate the pattern check entirely and throw error on config validation phase. WDYT? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r338837940 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +362,139 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchiveFileSystem: Option[FileSystem], + baseArchivePath: Option[Path]) extends Logging { +require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined) + +private val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePath) + +private val sameFsSourceAndArchive: Boolean = { + baseArchiveFileSystem.exists { fs => +if (fileSystem.getUri != fs.getUri) { + logWarning("Base archive path is located to the different filesystem with source, " + +s"which is not supported. source path: ${sourcePath} / base archive path: " + +s"${baseArchivePath.get}") + false +} else { + true +} + } +} + +/** + * This is a flag to skip matching archived path with source path. + * + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path + * + * Checking with glob pattern is costly, so this flag leverages above information to prune + * the cases where the file cannot be matched with source path. For example, when file is + * moved to archive directory, destination path will retain input file's path as suffix, + * so destination path can't be matched with source path if archive directory's depth is + * longer than 2, as neither file nor parent directory of destination path can be matched + * with source path. + */ +private val skipCheckingGlob: Boolean = baseArchivePath.exists(_.depth() > 2) + +def archive(entry: FileEntry): Unit = { + require(baseArchivePath.isDefined) + + if (sameFsSourceAndArchive) { +val curPath = new Path(new URI(entry.path)) +val newPath = new Path(baseArchivePath.get, curPath.toUri.getPath.stripPrefix("/")) + +if (!skipCheckingGlob && pathMatchesSourcePattern(newPath)) { Review comment: OK looking the comment again I only understood small amount of your comment. Sorry about this. ~~The source path contains wildcard in any subpath so only depth check can be optimized. Please correct me if I'm missing here.~~ Sorry never mind. I just got your point about further pruning the case. If baseArchivePath doesn't match with source path, no need to check further. Will add. Btw, I guess we still need this if baseArchivePath matches with source path, but please let me know if you're seeing the approach of verification we can eliminate this entirely. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r338837940 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +362,139 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchiveFileSystem: Option[FileSystem], + baseArchivePath: Option[Path]) extends Logging { +require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined) + +private val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePath) + +private val sameFsSourceAndArchive: Boolean = { + baseArchiveFileSystem.exists { fs => +if (fileSystem.getUri != fs.getUri) { + logWarning("Base archive path is located to the different filesystem with source, " + +s"which is not supported. source path: ${sourcePath} / base archive path: " + +s"${baseArchivePath.get}") + false +} else { + true +} + } +} + +/** + * This is a flag to skip matching archived path with source path. + * + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path + * + * Checking with glob pattern is costly, so this flag leverages above information to prune + * the cases where the file cannot be matched with source path. For example, when file is + * moved to archive directory, destination path will retain input file's path as suffix, + * so destination path can't be matched with source path if archive directory's depth is + * longer than 2, as neither file nor parent directory of destination path can be matched + * with source path. + */ +private val skipCheckingGlob: Boolean = baseArchivePath.exists(_.depth() > 2) + +def archive(entry: FileEntry): Unit = { + require(baseArchivePath.isDefined) + + if (sameFsSourceAndArchive) { +val curPath = new Path(new URI(entry.path)) +val newPath = new Path(baseArchivePath.get, curPath.toUri.getPath.stripPrefix("/")) + +if (!skipCheckingGlob && pathMatchesSourcePattern(newPath)) { Review comment: OK looking the comment again I only understood small amount of your comment. Sorry about this. The source path contains wildcard in any subpath so only depth check can be optimized. Please correct me if I'm missing here. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r338837229 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +362,139 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchiveFileSystem: Option[FileSystem], + baseArchivePath: Option[Path]) extends Logging { +require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined) + +private val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePath) + +private val sameFsSourceAndArchive: Boolean = { + baseArchiveFileSystem.exists { fs => +if (fileSystem.getUri != fs.getUri) { + logWarning("Base archive path is located to the different filesystem with source, " + +s"which is not supported. source path: ${sourcePath} / base archive path: " + +s"${baseArchivePath.get}") + false +} else { + true +} + } +} + +/** + * This is a flag to skip matching archived path with source path. + * + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path + * + * Checking with glob pattern is costly, so this flag leverages above information to prune + * the cases where the file cannot be matched with source path. For example, when file is + * moved to archive directory, destination path will retain input file's path as suffix, + * so destination path can't be matched with source path if archive directory's depth is + * longer than 2, as neither file nor parent directory of destination path can be matched + * with source path. + */ +private val skipCheckingGlob: Boolean = baseArchivePath.exists(_.depth() > 2) + +def archive(entry: FileEntry): Unit = { + require(baseArchivePath.isDefined) + + if (sameFsSourceAndArchive) { +val curPath = new Path(new URI(entry.path)) +val newPath = new Path(baseArchivePath.get, curPath.toUri.getPath.stripPrefix("/")) + +if (!skipCheckingGlob && pathMatchesSourcePattern(newPath)) { Review comment: Not sure I understand. Could you elaborate? I agreed with your suggestion as it shouldn't be needed to be calculated per path and don't get anything except that. If you meant about moving skipCheckingGlob into config validation, I'm not sure I like it, as it's just the optimization on FileStreamSourceCleaner, implementation details, so no need to expose it to outside. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r338835194 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +362,139 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchiveFileSystem: Option[FileSystem], + baseArchivePath: Option[Path]) extends Logging { +require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined) + +private val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePath) + +private val sameFsSourceAndArchive: Boolean = { Review comment: Oh OK. Sounds OK to fail the query and it might be even better than realizing later the configuration doesn't work since it was misconfigured or not supported. I'll make the change. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r338334551 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +353,137 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchivePathString: Option[String]) extends Logging { + +private val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePath) + +private val baseArchivePath: Option[Path] = baseArchivePathString.map(new Path(_)) + +def archive(entry: FileEntry): Unit = { + require(baseArchivePath.isDefined) + + val curPath = new Path(new URI(entry.path)) + val curPathUri = curPath.toUri + + val newPath = buildArchiveFilePath(curPathUri) + + if (isArchiveFileMatchedAgainstSourcePattern(newPath)) { +logWarning(s"Fail to move $curPath to $newPath - destination matches " + + s"to source path/pattern. Skip moving file.") + } else { +doArchive(curPath, newPath) + } +} + +def delete(entry: FileEntry): Unit = { + val curPath = new Path(new URI(entry.path)) + try { +logDebug(s"Removing completed file $curPath") + +if (!fileSystem.delete(curPath, false)) { + logWarning(s"Fail to remove $curPath / skip removing file.") +} + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to remove $curPath / skip removing file.", e) + } +} + +private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = { + val filters = new scala.collection.mutable.MutableList[GlobFilter]() + + var currentPath = sourcePath + while (!currentPath.isRoot) { +filters += new GlobFilter(currentPath.getName) +currentPath = currentPath.getParent + } + + filters.toList +} + +private def buildArchiveFilePath(pathUri: URI): Path = { + require(baseArchivePath.isDefined) + new Path(baseArchivePath.get, pathUri.getPath.stripPrefix("/")) +} + +/** + * This method checks whether the destination of archive file will be under the source path + * (which contains glob) to prevent the possibility of overwriting/re-reading as input. + * + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path + * + * Checking with glob pattern is costly, so this method leverages above information to prune + * the cases where the file cannot be matched with source path. For example, when file is + * moved to archive directory, destination path will retain input file's path as suffix, + * so destination path can't be matched with source path if archive directory's depth is + * longer than 2, as neither file nor parent directory of destination path can be matched + * with source path. + */ +private def isArchiveFileMatchedAgainstSourcePattern(archiveFile: Path): Boolean = { + if (baseArchivePath.get.depth() > 2) { Review comment: This optimization deals with simple fact: "glob path would only cover files in specific depth" (there's no notation like `**` which would match multiple depths of directories), so the depth of possibly matched paths is deterministic if you know about glob path. Suppose you have combined the path, all possible cases of archive path to be matched with glob path are 1) direct match: only possible if base archive path is just '/' which is depth 1 2) parent dir match: direct subdirectory under / (like `/a`) which the depth of base archive path would be 2. Does it make sense? > is there a reason why you need to check this for every archived path? Can't you, during config validation, perform this check once, with some path generated to match the source pattern, and then declare that there is no conflict between the archive path and the source path? No, that's what I've been missing. You're right it can be determined once we initialize. Nice finding. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r338347720 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +353,137 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchivePathString: Option[String]) extends Logging { + +private val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePath) + +private val baseArchivePath: Option[Path] = baseArchivePathString.map(new Path(_)) + +def archive(entry: FileEntry): Unit = { + require(baseArchivePath.isDefined) + + val curPath = new Path(new URI(entry.path)) + val curPathUri = curPath.toUri + + val newPath = buildArchiveFilePath(curPathUri) + + if (isArchiveFileMatchedAgainstSourcePattern(newPath)) { +logWarning(s"Fail to move $curPath to $newPath - destination matches " + + s"to source path/pattern. Skip moving file.") + } else { +doArchive(curPath, newPath) + } +} + +def delete(entry: FileEntry): Unit = { + val curPath = new Path(new URI(entry.path)) + try { +logDebug(s"Removing completed file $curPath") + +if (!fileSystem.delete(curPath, false)) { + logWarning(s"Fail to remove $curPath / skip removing file.") +} + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to remove $curPath / skip removing file.", e) + } +} + +private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = { + val filters = new scala.collection.mutable.MutableList[GlobFilter]() + + var currentPath = sourcePath + while (!currentPath.isRoot) { +filters += new GlobFilter(currentPath.getName) +currentPath = currentPath.getParent + } + + filters.toList +} + +private def buildArchiveFilePath(pathUri: URI): Path = { + require(baseArchivePath.isDefined) + new Path(baseArchivePath.get, pathUri.getPath.stripPrefix("/")) +} + +/** + * This method checks whether the destination of archive file will be under the source path + * (which contains glob) to prevent the possibility of overwriting/re-reading as input. + * + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path + * + * Checking with glob pattern is costly, so this method leverages above information to prune + * the cases where the file cannot be matched with source path. For example, when file is + * moved to archive directory, destination path will retain input file's path as suffix, + * so destination path can't be matched with source path if archive directory's depth is + * longer than 2, as neither file nor parent directory of destination path can be matched + * with source path. + */ +private def isArchiveFileMatchedAgainstSourcePattern(archiveFile: Path): Boolean = { + if (baseArchivePath.get.depth() > 2) { +// there's no chance for archive file to be matched against source pattern +return false + } + + var matched = true + + // new path will never match against source path when the depth is not a range of + // the depth of source path ~ (the depth of source path + 1) + // because the source files are picked when they match against source pattern or + // their parent directories match against source pattern + val depthSourcePattern = sourceGlobFilters.length + val depthArchiveFile = archiveFile.depth() + + // we already checked against the depth of archive path, but rechecking wouldn't hurt + if (depthArchiveFile < depthSourcePattern || depthArchiveFile > depthSourcePattern + 1) { +// never matched +matched = false + } else { +var pathToCompare = if (depthArchiveFile == depthSourcePattern + 1) { + archiveFile.getParent +} else { + archiveFile +} + +// Now pathToCompare should have same depth as sourceGlobFilters.length +var index = 0 +do { + // GlobFilter only matches against its name, not full path so it's safe to compare + if (!sourceGlobFilters(index).accept(pathToCompare)) { +matched = false + } else { +pathToCompare = pathToCompare.getParent +index += 1 + } +} while (matched && !pathToCompare.isRoot) + } + + matched +} + +private def doArchive(sourcePath: Path, archivePath: Path): Unit = { +
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r338338952 ## File path: docs/structured-streaming-programming-guide.md ## @@ -546,6 +546,13 @@ Here are the details of all the sources in Spark. "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" +cleanSource: option to clean up completed files after processing. +Available options are "archive", "delete", "off". If the option is not provided, the default value is "off". +When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included to new source files again. +Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt" +NOTE: Both archiving (via moving) or deleting completed files would introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which is considered as a heavy operation. +NOTE 2: The source path should not be used from multiple sources or queries when enabling this option, because source files will be moved or deleted which behavior may impact the other sources and queries. Review comment: Revisiting the comment throughly and yes you're right that was the intention. OK to remove latter part. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r338336181 ## File path: docs/structured-streaming-programming-guide.md ## @@ -546,6 +546,13 @@ Here are the details of all the sources in Spark. "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" +cleanSource: option to clean up completed files after processing. +Available options are "archive", "delete", "off". If the option is not provided, the default value is "off". +When "archive" is provided, additional option sourceArchiveDir must be provided as well. The value of "sourceArchiveDir" must be outside of source path, to ensure archived files are never included to new source files again. +Spark will move source files respecting its own path. For example, if the path of source file is "/a/b/dataset.txt" and the path of archive directory is "/archived/here", file will be moved to "/archived/here/a/b/dataset.txt" +NOTE: Both archiving (via moving) or deleting completed files would introduce overhead (slow down) in each micro-batch, so you need to understand the cost for each operation in your file system before enabling this option. On the other hand, enabling this option will reduce the cost to list source files which is considered as a heavy operation. +NOTE 2: The source path should not be used from multiple sources or queries when enabling this option, because source files will be moved or deleted which behavior may impact the other sources and queries. Review comment: This is a reflection of previous review comment https://github.com/apache/spark/pull/22952#discussion_r237314459 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r338334551 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +353,137 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchivePathString: Option[String]) extends Logging { + +private val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePath) + +private val baseArchivePath: Option[Path] = baseArchivePathString.map(new Path(_)) + +def archive(entry: FileEntry): Unit = { + require(baseArchivePath.isDefined) + + val curPath = new Path(new URI(entry.path)) + val curPathUri = curPath.toUri + + val newPath = buildArchiveFilePath(curPathUri) + + if (isArchiveFileMatchedAgainstSourcePattern(newPath)) { +logWarning(s"Fail to move $curPath to $newPath - destination matches " + + s"to source path/pattern. Skip moving file.") + } else { +doArchive(curPath, newPath) + } +} + +def delete(entry: FileEntry): Unit = { + val curPath = new Path(new URI(entry.path)) + try { +logDebug(s"Removing completed file $curPath") + +if (!fileSystem.delete(curPath, false)) { + logWarning(s"Fail to remove $curPath / skip removing file.") +} + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to remove $curPath / skip removing file.", e) + } +} + +private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = { + val filters = new scala.collection.mutable.MutableList[GlobFilter]() + + var currentPath = sourcePath + while (!currentPath.isRoot) { +filters += new GlobFilter(currentPath.getName) +currentPath = currentPath.getParent + } + + filters.toList +} + +private def buildArchiveFilePath(pathUri: URI): Path = { + require(baseArchivePath.isDefined) + new Path(baseArchivePath.get, pathUri.getPath.stripPrefix("/")) +} + +/** + * This method checks whether the destination of archive file will be under the source path + * (which contains glob) to prevent the possibility of overwriting/re-reading as input. + * + * FileStreamSource reads the files which one of below conditions is met: + * 1) file itself is matched with source path + * 2) parent directory is matched with source path + * + * Checking with glob pattern is costly, so this method leverages above information to prune + * the cases where the file cannot be matched with source path. For example, when file is + * moved to archive directory, destination path will retain input file's path as suffix, + * so destination path can't be matched with source path if archive directory's depth is + * longer than 2, as neither file nor parent directory of destination path can be matched + * with source path. + */ +private def isArchiveFileMatchedAgainstSourcePattern(archiveFile: Path): Boolean = { + if (baseArchivePath.get.depth() > 2) { Review comment: This optimization deals with simple fact: "glob path would only cover files in specific depth" (there's no notation like `**` which would match multiple depths of directories), so the depth of possibly matched paths is deterministic if you know about glob path. Given we move source file on the archive path which would be a directory, depth will be increased by 1. And FileStreamSource allows the "parent directory" of source file to be matched with the glob path, the possible matching depths would be "the depth of glob path" + 1 and "the depth of glob path" + 2. Does it make sense? > is there a reason why you need to check this for every archived path? Can't you, during config validation, perform this check once, with some path generated to match the source pattern, and then declare that there is no conflict between the archive path and the source path? No, that's what I've been missing. You're right it can be determined once we initialize. Nice finding. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r335220635 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +353,129 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchivePathString: Option[String]) extends Logging { + +private val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePath) + +private val baseArchivePath: Option[Path] = baseArchivePathString.map(new Path(_)) + +def archive(entry: FileEntry): Unit = { + require(baseArchivePath.isDefined) + + val curPath = new Path(new URI(entry.path)) + val curPathUri = curPath.toUri + + val newPath = buildArchiveFilePath(curPathUri) + + if (isArchiveFileMatchedAgainstSourcePattern(newPath)) { +logWarning(s"Fail to move $curPath to $newPath - destination matches " + + s"to source path/pattern. Skip moving file.") + } else { +doArchive(curPath, newPath) + } +} + +def delete(entry: FileEntry): Unit = { + val curPath = new Path(new URI(entry.path)) + try { +logDebug(s"Removing completed file $curPath") + +if (!fileSystem.delete(curPath, false)) { + logWarning(s"Fail to remove $curPath / skip removing file.") +} + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to remove $curPath / skip removing file.", e) + } +} + +private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = { + val filters = new scala.collection.mutable.MutableList[GlobFilter]() + + var currentPath = sourcePath + while (!currentPath.isRoot) { +filters += new GlobFilter(currentPath.getName) +currentPath = currentPath.getParent + } + + filters.toList +} + +private def buildArchiveFilePath(pathUri: URI): Path = { + require(baseArchivePathString.isDefined) + val baseArchivePathStr = baseArchivePathString.get + val normalizedBaseArchiveDirPath = if (baseArchivePathStr.endsWith("/")) { +baseArchivePathStr.substring(0, baseArchivePathStr.length - 1) + } else { +baseArchivePathStr + } + + new Path(normalizedBaseArchiveDirPath + pathUri.getPath) Review comment: Ah I see. I got the point - making child to be relative (by removing leading '/') and let constructor resolves it. Didn't indicate the trick works. Nice suggestion. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r335217980 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +353,129 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchivePathString: Option[String]) extends Logging { + +private val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePath) + +private val baseArchivePath: Option[Path] = baseArchivePathString.map(new Path(_)) + +def archive(entry: FileEntry): Unit = { + require(baseArchivePath.isDefined) + + val curPath = new Path(new URI(entry.path)) + val curPathUri = curPath.toUri + + val newPath = buildArchiveFilePath(curPathUri) + + if (isArchiveFileMatchedAgainstSourcePattern(newPath)) { +logWarning(s"Fail to move $curPath to $newPath - destination matches " + + s"to source path/pattern. Skip moving file.") + } else { +doArchive(curPath, newPath) + } +} + +def delete(entry: FileEntry): Unit = { + val curPath = new Path(new URI(entry.path)) + try { +logDebug(s"Removing completed file $curPath") + +if (!fileSystem.delete(curPath, false)) { + logWarning(s"Fail to remove $curPath / skip removing file.") +} + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to remove $curPath / skip removing file.", e) + } +} + +private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = { + val filters = new scala.collection.mutable.MutableList[GlobFilter]() + + var currentPath = sourcePath + while (!currentPath.isRoot) { +filters += new GlobFilter(currentPath.getName) +currentPath = currentPath.getParent + } + + filters.toList +} + +private def buildArchiveFilePath(pathUri: URI): Path = { + require(baseArchivePathString.isDefined) + val baseArchivePathStr = baseArchivePathString.get + val normalizedBaseArchiveDirPath = if (baseArchivePathStr.endsWith("/")) { +baseArchivePathStr.substring(0, baseArchivePathStr.length - 1) + } else { +baseArchivePathStr + } + + new Path(normalizedBaseArchiveDirPath + pathUri.getPath) +} + +private def isArchiveFileMatchedAgainstSourcePattern(archiveFile: Path): Boolean = { Review comment: I didn't mean it shouldn't be explained in a scaladoc in method - I meant the method has comments in near code lines to explain. Sounds like the preference is adding scaladoc in this case; I'll leave a comment. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r334720763 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +353,129 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchivePathString: Option[String]) extends Logging { + +private val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePath) + +private val baseArchivePath: Option[Path] = baseArchivePathString.map(new Path(_)) + +def archive(entry: FileEntry): Unit = { + require(baseArchivePath.isDefined) + + val curPath = new Path(new URI(entry.path)) + val curPathUri = curPath.toUri + + val newPath = buildArchiveFilePath(curPathUri) + + if (isArchiveFileMatchedAgainstSourcePattern(newPath)) { +logWarning(s"Fail to move $curPath to $newPath - destination matches " + + s"to source path/pattern. Skip moving file.") + } else { +doArchive(curPath, newPath) + } +} + +def delete(entry: FileEntry): Unit = { + val curPath = new Path(new URI(entry.path)) + try { +logDebug(s"Removing completed file $curPath") + +if (!fileSystem.delete(curPath, false)) { + logWarning(s"Fail to remove $curPath / skip removing file.") +} + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to remove $curPath / skip removing file.", e) + } +} + +private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = { + val filters = new scala.collection.mutable.MutableList[GlobFilter]() + + var currentPath = sourcePath + while (!currentPath.isRoot) { +filters += new GlobFilter(currentPath.getName) +currentPath = currentPath.getParent + } + + filters.toList +} + +private def buildArchiveFilePath(pathUri: URI): Path = { + require(baseArchivePathString.isDefined) + val baseArchivePathStr = baseArchivePathString.get + val normalizedBaseArchiveDirPath = if (baseArchivePathStr.endsWith("/")) { +baseArchivePathStr.substring(0, baseArchivePathStr.length - 1) + } else { +baseArchivePathStr + } + + new Path(normalizedBaseArchiveDirPath + pathUri.getPath) Review comment: > Is pathUri guaranteed to contain an absolute path? Yes, FileStreamSource qualifies the path when listing files. > Because then this is basically new Path(parentPath, pathUri.toString.substring(1)), which also may mean you can clean up a bunch of this code. `.getPath` is used because URI could have schema part and then `pathUri.toString.substring(1)` wouldn't work as expected. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r334718819 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +353,129 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchivePathString: Option[String]) extends Logging { + +private val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePath) + +private val baseArchivePath: Option[Path] = baseArchivePathString.map(new Path(_)) + +def archive(entry: FileEntry): Unit = { + require(baseArchivePath.isDefined) + + val curPath = new Path(new URI(entry.path)) + val curPathUri = curPath.toUri + + val newPath = buildArchiveFilePath(curPathUri) + + if (isArchiveFileMatchedAgainstSourcePattern(newPath)) { +logWarning(s"Fail to move $curPath to $newPath - destination matches " + + s"to source path/pattern. Skip moving file.") + } else { +doArchive(curPath, newPath) + } +} + +def delete(entry: FileEntry): Unit = { + val curPath = new Path(new URI(entry.path)) + try { +logDebug(s"Removing completed file $curPath") + +if (!fileSystem.delete(curPath, false)) { + logWarning(s"Fail to remove $curPath / skip removing file.") +} + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to remove $curPath / skip removing file.", e) + } +} + +private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = { + val filters = new scala.collection.mutable.MutableList[GlobFilter]() + + var currentPath = sourcePath + while (!currentPath.isRoot) { +filters += new GlobFilter(currentPath.getName) +currentPath = currentPath.getParent + } + + filters.toList +} + +private def buildArchiveFilePath(pathUri: URI): Path = { + require(baseArchivePathString.isDefined) + val baseArchivePathStr = baseArchivePathString.get + val normalizedBaseArchiveDirPath = if (baseArchivePathStr.endsWith("/")) { +baseArchivePathStr.substring(0, baseArchivePathStr.length - 1) + } else { +baseArchivePathStr + } + + new Path(normalizedBaseArchiveDirPath + pathUri.getPath) +} + +private def isArchiveFileMatchedAgainstSourcePattern(archiveFile: Path): Boolean = { Review comment: > My guess is that it's checking whether the archive dir is under the source directory? It checks whether the destination of archive "file" will be under the source path (which contains glob) to prevent the possibility of overwriting/re-reading as input. So actually the method name contains everything - if `SourcePattern` isn't a familiar representation, I'll change it to `SourcePath`. If we prefer to `MatchedWith` instead of `MatchedAgainst`, I'll change it. If we agree the method name explains what it is doing, we might be able to skip adding doc - especially it's a private method. It may be the thing where is the best place to explain the details - I've added comments near code lines, but I can adjust it to scaladoc if we prefer it. > This looks way too complicated for a simple check like that. The method becomes complicated because of two reasons: 1) There's a tricky part in FileStreamSource: FileStreamSource doesn't only match the files which match the source path, but also matches the files which parent directory matches the source path. So we should consider both cases: 1) file itself is matched 2) parent directory is matched. Please refer below comments in test code: https://github.com/apache/spark/blob/bd8da3799dd160771ebb3ea55b7678b644248425/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala#L736-L778 2) Checking with glob pattern is costly, so we would like to avoid the case via leveraging known information if possible. For example, when file is moved to archive directory, destination path will retain input file's path as suffix, so destination path can't be matched with source path if archive directory's depth is longer than 2. (Neither file nor parent directory of destination path can be matched with source path.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services -
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r334718819 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -330,4 +353,129 @@ object FileStreamSource { def size: Int = map.size() } + + private[sql] class FileStreamSourceCleaner( + fileSystem: FileSystem, + sourcePath: Path, + baseArchivePathString: Option[String]) extends Logging { + +private val sourceGlobFilters: Seq[GlobFilter] = buildSourceGlobFilters(sourcePath) + +private val baseArchivePath: Option[Path] = baseArchivePathString.map(new Path(_)) + +def archive(entry: FileEntry): Unit = { + require(baseArchivePath.isDefined) + + val curPath = new Path(new URI(entry.path)) + val curPathUri = curPath.toUri + + val newPath = buildArchiveFilePath(curPathUri) + + if (isArchiveFileMatchedAgainstSourcePattern(newPath)) { +logWarning(s"Fail to move $curPath to $newPath - destination matches " + + s"to source path/pattern. Skip moving file.") + } else { +doArchive(curPath, newPath) + } +} + +def delete(entry: FileEntry): Unit = { + val curPath = new Path(new URI(entry.path)) + try { +logDebug(s"Removing completed file $curPath") + +if (!fileSystem.delete(curPath, false)) { + logWarning(s"Fail to remove $curPath / skip removing file.") +} + } catch { +case NonFatal(e) => + // Log to error but swallow exception to avoid process being stopped + logWarning(s"Fail to remove $curPath / skip removing file.", e) + } +} + +private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = { + val filters = new scala.collection.mutable.MutableList[GlobFilter]() + + var currentPath = sourcePath + while (!currentPath.isRoot) { +filters += new GlobFilter(currentPath.getName) +currentPath = currentPath.getParent + } + + filters.toList +} + +private def buildArchiveFilePath(pathUri: URI): Path = { + require(baseArchivePathString.isDefined) + val baseArchivePathStr = baseArchivePathString.get + val normalizedBaseArchiveDirPath = if (baseArchivePathStr.endsWith("/")) { +baseArchivePathStr.substring(0, baseArchivePathStr.length - 1) + } else { +baseArchivePathStr + } + + new Path(normalizedBaseArchiveDirPath + pathUri.getPath) +} + +private def isArchiveFileMatchedAgainstSourcePattern(archiveFile: Path): Boolean = { Review comment: > My guess is that it's checking whether the archive dir is under the source directory? It checks whether the destination of archive "file" will be under the source path (which contains glob) to prevent the possibility of overwriting/re-reading as input. So actually the method name contains everything - if `SourcePattern` isn't a familiar representation, I'll change it to `SourcePath`. If we prefer to `MatchedWith` instead of `MatchedAgainst`, I'll change it. If we agree the method name explains what it is doing, we might be able to skip adding doc - especially it's a private method. > This looks way too complicated for a simple check like that. The method becomes complicated because of two reasons: 1) There's a tricky part in FileStreamSource: FileStreamSource doesn't only match the files which match the source path, but also matches the files which parent directory matches the source path. So we should consider both cases: 1) file itself is matched 2) parent directory is matched. Please refer below comments in test code: https://github.com/apache/spark/blob/bd8da3799dd160771ebb3ea55b7678b644248425/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala#L736-L778 2) Checking with glob pattern is costly, so we would like to avoid the case via leveraging known information if possible. For example, when file is moved to archive directory, destination path will retain input file's path as suffix, so destination path can't be matched with source path if archive directory's depth is longer than 2. (Neither file nor parent directory of destination path can be matched with source path.) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark
[GitHub] [spark] HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query
HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS] Provide option to clean up completed files in streaming query URL: https://github.com/apache/spark/pull/22952#discussion_r334714556 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala ## @@ -258,16 +264,33 @@ class FileStreamSource( * equal to `end` and will only request offsets greater than `end` in the future. */ override def commit(end: Offset): Unit = { -// No-op for now; FileStreamSource currently garbage-collects files based on timestamp -// and the value of the maxFileAge parameter. +val logOffset = FileStreamSourceOffset(end).logOffset + +if (sourceOptions.cleanSource != CleanSourceMode.NO_OP) { Review comment: That was suggested earlier and we agreed to deal with it from follow-up issue - the PR just has been sitting longer than expected. Could we deal with follow-up issue? Btw, if we don't guarantee the cleanup and do it with best effort (actually we already do it), it won't matter much to do it with background thread - except the problem you've mentioned. That could be remedied via retaining max amount of paths to cleanup (yes, this is based on "best effort"). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org