HyukjinKwon commented on a change in pull request #27557:
URL: https://github.com/apache/spark/pull/27557#discussion_r413027638



##########
File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
##########
@@ -177,16 +178,35 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : 
ClassTag](
    * corresponding `batchId` file. It will delete expired files as well if 
enabled.
    */
   private def compact(batchId: Long, logs: Array[T]): Boolean = {
-    val validBatches = getValidBatchesBeforeCompactionBatch(batchId, 
compactInterval)
-    val allLogs = validBatches.flatMap { id =>
-      super.get(id).getOrElse {
-        throw new IllegalStateException(
-          s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId 
" +
-            s"(compactInterval: $compactInterval)")
-      }
-    } ++ logs
+    val (allLogs, loadElapsedMs) = Utils.timeTakenMs {
+      val validBatches = getValidBatchesBeforeCompactionBatch(batchId, 
compactInterval)
+      validBatches.flatMap { id =>
+        super.get(id).getOrElse {
+          throw new IllegalStateException(
+            s"${batchIdToPath(id)} doesn't exist when compacting batch 
$batchId " +
+              s"(compactInterval: $compactInterval)")
+        }
+      } ++ logs
+    }
+    val compactedLogs = compactLogs(allLogs)
+
     // Return false as there is another writer.
-    super.add(batchId, compactLogs(allLogs).toArray)
+    val (writeSucceed, writeElapsedMs) = Utils.timeTakenMs {
+      super.add(batchId, compactedLogs.toArray)
+    }
+
+    val elapsedMs = loadElapsedMs + writeElapsedMs
+    if (elapsedMs >= COMPACT_LATENCY_WARN_THRESHOLD_MS) {
+      logWarning(s"Compacting took $elapsedMs ms (load: $loadElapsedMs ms," +
+        s" write: $writeElapsedMs ms) for compact batch $batchId")
+      logWarning(s"Loaded ${allLogs.size} entries 
(${SizeEstimator.estimate(allLogs)} bytes in " +

Review comment:
       Should we clarify this size is estimated in the log?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to