HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS]
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r338960691
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
##########
@@ -330,4 +362,139 @@ object FileStreamSource {
def size: Int = map.size()
}
+
+ private[sql] class FileStreamSourceCleaner(
+ fileSystem: FileSystem,
+ sourcePath: Path,
+ baseArchiveFileSystem: Option[FileSystem],
+ baseArchivePath: Option[Path]) extends Logging {
+ require(baseArchiveFileSystem.isDefined == baseArchivePath.isDefined)
+
+ private val sourceGlobFilters: Seq[GlobFilter] =
buildSourceGlobFilters(sourcePath)
+
+ private val sameFsSourceAndArchive: Boolean = {
+ baseArchiveFileSystem.exists { fs =>
+ if (fileSystem.getUri != fs.getUri) {
+ logWarning("Base archive path is located to the different filesystem
with source, " +
+ s"which is not supported. source path: ${sourcePath} / base
archive path: " +
+ s"${baseArchivePath.get}")
+ false
+ } else {
+ true
+ }
+ }
+ }
+
+ /**
+ * This is a flag to skip matching archived path with source path.
+ *
+ * FileStreamSource reads the files which one of below conditions is met:
+ * 1) file itself is matched with source path
+ * 2) parent directory is matched with source path
+ *
+ * Checking with glob pattern is costly, so this flag leverages above
information to prune
+ * the cases where the file cannot be matched with source path. For
example, when file is
+ * moved to archive directory, destination path will retain input file's
path as suffix,
+ * so destination path can't be matched with source path if archive
directory's depth is
+ * longer than 2, as neither file nor parent directory of destination path
can be matched
+ * with source path.
+ */
+ private val skipCheckingGlob: Boolean = baseArchivePath.exists(_.depth() >
2)
+
+ def archive(entry: FileEntry): Unit = {
+ require(baseArchivePath.isDefined)
+
+ if (sameFsSourceAndArchive) {
+ val curPath = new Path(new URI(entry.path))
+ val newPath = new Path(baseArchivePath.get,
curPath.toUri.getPath.stripPrefix("/"))
+
+ if (!skipCheckingGlob && pathMatchesSourcePattern(newPath)) {
Review comment:
Technically, we just need to check pattern only when the depth of base
archive path is 2. The depth of base archive path cannot be 1, because then
archive file will always refer to the same path of original source file. (Yes I
was aware of this but forgot to apply the fact to prune.)
So `depth > 2` -> no need to check, `depth == 1` -> always matches, `depth
== 2` -> need to check the pattern. We can even just simplify the case to avoid
checking per file as checking the earliest dir name of base archive path
matches the earlier dir name (including glob path) of source path. It won't
guarantee all the archive paths will match the source path - that's why I did
full pattern match per file, but if we really don't want to do pattern matching
per file, yes, we can do it.
What I concern about is rather not a technical one. If we want to just yell
and fail the query instead of skipping archive file, we must ensure end users
understand why the query is failed and how to get it fixed. I had to explain
most of reviewers how pattern check works, even with that I'm feeling reviewers
don't feel familiar with it - I'm unsure it would work for end users though we
can add some guidance how it works.
So we will need to simplify the condition as much as possible if we really
want to just fail the query. The difference of necessary amount of explanation
for depth > 2 and depth >=2 are significant, and the actual harm of not
allowing depth == 2 is that they just need to create a subdirectory and use the
subdirectory, which doesn't seem to be critical.
Does it make sense?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]