HeartSaVioR commented on a change in pull request #22952: [SPARK-20568][SS]
Provide option to clean up completed files in streaming query
URL: https://github.com/apache/spark/pull/22952#discussion_r334718819
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
##########
@@ -330,4 +353,129 @@ object FileStreamSource {
def size: Int = map.size()
}
+
+ private[sql] class FileStreamSourceCleaner(
+ fileSystem: FileSystem,
+ sourcePath: Path,
+ baseArchivePathString: Option[String]) extends Logging {
+
+ private val sourceGlobFilters: Seq[GlobFilter] =
buildSourceGlobFilters(sourcePath)
+
+ private val baseArchivePath: Option[Path] = baseArchivePathString.map(new
Path(_))
+
+ def archive(entry: FileEntry): Unit = {
+ require(baseArchivePath.isDefined)
+
+ val curPath = new Path(new URI(entry.path))
+ val curPathUri = curPath.toUri
+
+ val newPath = buildArchiveFilePath(curPathUri)
+
+ if (isArchiveFileMatchedAgainstSourcePattern(newPath)) {
+ logWarning(s"Fail to move $curPath to $newPath - destination matches "
+
+ s"to source path/pattern. Skip moving file.")
+ } else {
+ doArchive(curPath, newPath)
+ }
+ }
+
+ def delete(entry: FileEntry): Unit = {
+ val curPath = new Path(new URI(entry.path))
+ try {
+ logDebug(s"Removing completed file $curPath")
+
+ if (!fileSystem.delete(curPath, false)) {
+ logWarning(s"Fail to remove $curPath / skip removing file.")
+ }
+ } catch {
+ case NonFatal(e) =>
+ // Log to error but swallow exception to avoid process being stopped
+ logWarning(s"Fail to remove $curPath / skip removing file.", e)
+ }
+ }
+
+ private def buildSourceGlobFilters(sourcePath: Path): Seq[GlobFilter] = {
+ val filters = new scala.collection.mutable.MutableList[GlobFilter]()
+
+ var currentPath = sourcePath
+ while (!currentPath.isRoot) {
+ filters += new GlobFilter(currentPath.getName)
+ currentPath = currentPath.getParent
+ }
+
+ filters.toList
+ }
+
+ private def buildArchiveFilePath(pathUri: URI): Path = {
+ require(baseArchivePathString.isDefined)
+ val baseArchivePathStr = baseArchivePathString.get
+ val normalizedBaseArchiveDirPath = if (baseArchivePathStr.endsWith("/"))
{
+ baseArchivePathStr.substring(0, baseArchivePathStr.length - 1)
+ } else {
+ baseArchivePathStr
+ }
+
+ new Path(normalizedBaseArchiveDirPath + pathUri.getPath)
+ }
+
+ private def isArchiveFileMatchedAgainstSourcePattern(archiveFile: Path):
Boolean = {
Review comment:
> My guess is that it's checking whether the archive dir is under the source
directory?
It checks whether the destination of archive "file" will be under the source
path (which contains glob) to prevent the possibility of overwriting/re-reading
as input.
So actually the method name contains everything - if `SourcePattern` isn't a
familiar representation, I'll change it to `SourcePath`. If we prefer to
`MatchedWith` instead of `MatchedAgainst`, I'll change it.
If we agree the method name explains what it is doing, we might be able to
skip adding doc - especially it's a private method. It may be the thing where
is the best place to explain the details - I've added comments near code lines,
but I can adjust it to scaladoc if we prefer it.
> This looks way too complicated for a simple check like that.
The method becomes complicated because of two reasons:
1) There's a tricky part in FileStreamSource: FileStreamSource doesn't only
match the files which match the source path, but also matches the files which
parent directory matches the source path. So we should consider both cases: 1)
file itself is matched 2) parent directory is matched.
Please refer below comments in test code:
https://github.com/apache/spark/blob/bd8da3799dd160771ebb3ea55b7678b644248425/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala#L736-L778
2) Checking with glob pattern is costly, so we would like to avoid the case
via leveraging known information if possible. For example, when file is moved
to archive directory, destination path will retain input file's path as suffix,
so destination path can't be matched with source path if archive directory's
depth is longer than 2. (Neither file nor parent directory of destination path
can be matched with source path.)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]