HeartSaVioR commented on a change in pull request #28904:
URL: https://github.com/apache/spark/pull/28904#discussion_r444050433



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
##########
@@ -222,21 +256,22 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
         try {
           val logs =
             getAllValidBatches(latestId, compactInterval).flatMap { id =>
-              super.get(id).getOrElse {
+              filterInBatch(id)(shouldRetain).getOrElse {

Review comment:
       This would help when we introduce a new condition on exclusion of 
entries.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
##########
@@ -97,18 +97,15 @@ class FileStreamSinkLog(
     s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was 
$defaultCompactInterval) " +
       "to a positive value.")
 
-  override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
-    val deletedFiles = logs.filter(_.action == 
FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
-    if (deletedFiles.isEmpty) {
-      logs
-    } else {
-      logs.filter(f => !deletedFiles.contains(f.path))
-    }
+  override def shouldRetain(log: SinkFileStatus): Boolean = {
+    log.action != FileStreamSinkLog.DELETE_ACTION

Review comment:
       While I just keep this, I think we should just remove this. As I left 
TODO below, it hasn't been used, exists hypothetically.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
##########
@@ -212,7 +246,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
   /**
    * Returns all files except the deleted ones.
    */
-  def allFiles(): Array[T] = {
+  def allFiles(predicate: T => Boolean = _ => true): Array[T] = {

Review comment:
       We can also have a streamlined version of this method to avoid 
materializing all entries on initializing FileStreamSource, though I think 
there's the another problem we should solve (file stream source log should not 
have bunch of entries - think about other data sources) and once we fixed that 
issue it won't matter at all.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
##########
@@ -106,10 +106,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
     interval
   }
 
-  /**
-   * Filter out the obsolete logs.
-   */
-  def compactLogs(logs: Seq[T]): Seq[T]
+  /** Determine whether the log should be retained or not. */
+  def shouldRetain(log: T): Boolean

Review comment:
       I just retained the functionality of exclusion, as we still have a 
chance to apply retention which is applied per entry.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
##########
@@ -222,21 +256,22 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
         try {
           val logs =
             getAllValidBatches(latestId, compactInterval).flatMap { id =>
-              super.get(id).getOrElse {
+              filterInBatch(id)(shouldRetain).getOrElse {
                 throw new IllegalStateException(
                   s"${batchIdToPath(id)} doesn't exist " +
                     s"(latestId: $latestId, compactInterval: 
$compactInterval)")
               }
             }
-          return compactLogs(logs).toArray
+          return logs.toArray
         } catch {
           case e: IOException =>
             // Another process using `CompactibleFileStreamLog` may delete the 
batch files when
             // `StreamFileIndex` are reading. However, it only happens when a 
compaction is
             // deleting old files. If so, let's try the next compaction batch 
and we should find it.
             // Otherwise, this is a real IO issue and we should throw it.
-            latestId = nextCompactionBatchId(latestId, compactInterval)
-            super.get(latestId).getOrElse {
+            val expectedMinLatestId = nextCompactionBatchId(latestId, 
compactInterval)

Review comment:
       This new approach is to avoid reading the next compact file log, which 
materializes all entries into the file.

##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
##########
@@ -97,18 +97,15 @@ class FileStreamSinkLog(
     s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was 
$defaultCompactInterval) " +
       "to a positive value.")
 
-  override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
-    val deletedFiles = logs.filter(_.action == 
FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
-    if (deletedFiles.isEmpty) {
-      logs
-    } else {
-      logs.filter(f => !deletedFiles.contains(f.path))
-    }
+  override def shouldRetain(log: SinkFileStatus): Boolean = {
+    log.action != FileStreamSinkLog.DELETE_ACTION
   }
 }
 
 object FileStreamSinkLog {
   val VERSION = 1
+  // TODO: This action hasn't been used from the introduction. We should just 
remove this.
+  // TODO: We can remove the field "action" as well, ignoring "action" in 
existing metadata log.

Review comment:
       Note that this would also help to reduce the size of each entry. OK to 
leave only ADD_ACTION if JSON serializer/deserializer complains.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to