Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22952#discussion_r237319176
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 ---
    @@ -257,16 +289,65 @@ class FileStreamSource(
        * equal to `end` and will only request offsets greater than `end` in 
the future.
        */
       override def commit(end: Offset): Unit = {
    -    // No-op for now; FileStreamSource currently garbage-collects files 
based on timestamp
    -    // and the value of the maxFileAge parameter.
    +    def move(entry: FileEntry, baseArchiveDirPath: String): Unit = {
    +      val curPath = new Path(entry.path)
    +      val curPathUri = curPath.toUri
    +
    +      val newPath = new Path(baseArchiveDirPath + curPathUri.getPath)
    +      try {
    +        logDebug(s"Creating directory if it doesn't exist 
${newPath.getParent}")
    +        if (!fs.exists(newPath.getParent)) {
    +          fs.mkdirs(newPath.getParent)
    +        }
    +
    +        logDebug(s"Archiving completed file $curPath to $newPath")
    +        fs.rename(curPath, newPath)
    +      } catch {
    +        case NonFatal(e) =>
    +          // Log to error but swallow exception to avoid process being 
stopped
    +          logWarning(s"Fail to move $curPath to $newPath / skip moving 
file.", e)
    +      }
    +    }
    +
    +    def remove(entry: FileEntry): Unit = {
    +      val curPath = new Path(entry.path)
    +      try {
    +        logDebug(s"Removing completed file $curPath")
    +        fs.delete(curPath, false)
    +      } catch {
    +        case NonFatal(e) =>
    +          // Log to error but swallow exception to avoid process being 
stopped
    +          logWarning(s"Fail to remove $curPath / skip removing file.", e)
    +      }
    +    }
    +
    +    val logOffset = FileStreamSourceOffset(end).logOffset
    +    metadataLog.get(logOffset) match {
    --- End diff --
    
    you can use `val files = metadataLog.get(Some(logOffset), 
Some(logOffset)).flatMap(_._2)` to use the underlying cache in 
FileStreamSourceLog.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to