xuanyuanking commented on a change in pull request #27557: [SPARK-30804][SS]
Measure and log elapsed time for "compact" operation in CompactibleFileStreamLog
URL: https://github.com/apache/spark/pull/27557#discussion_r382560001
##########
File path:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/CompactibleFileStreamLog.scala
##########
@@ -177,16 +178,35 @@ abstract class CompactibleFileStreamLog[T <: AnyRef :
ClassTag](
* corresponding `batchId` file. It will delete expired files as well if
enabled.
*/
private def compact(batchId: Long, logs: Array[T]): Boolean = {
- val validBatches = getValidBatchesBeforeCompactionBatch(batchId,
compactInterval)
- val allLogs = validBatches.flatMap { id =>
- super.get(id).getOrElse {
- throw new IllegalStateException(
- s"${batchIdToPath(id)} doesn't exist when compacting batch $batchId
" +
- s"(compactInterval: $compactInterval)")
- }
- } ++ logs
+ val (allLogs, loadElapsedMs) = Utils.timeTakenMs {
+ val validBatches = getValidBatchesBeforeCompactionBatch(batchId,
compactInterval)
+ validBatches.flatMap { id =>
+ super.get(id).getOrElse {
+ throw new IllegalStateException(
+ s"${batchIdToPath(id)} doesn't exist when compacting batch
$batchId " +
+ s"(compactInterval: $compactInterval)")
+ }
+ } ++ logs
+ }
+ val compactedLogs = compactLogs(allLogs)
+
// Return false as there is another writer.
- super.add(batchId, compactLogs(allLogs).toArray)
+ val (writeSucceed, writeElapsedMs) = Utils.timeTakenMs {
+ super.add(batchId, compactedLogs.toArray)
+ }
+
+ val elapsedMs = loadElapsedMs + writeElapsedMs
+ if (elapsedMs >= COMPACT_LATENCY_WARN_THRESHOLD_MS) {
+ logWarning(s"Compacting took $elapsedMs ms (load: $loadElapsedMs ms," +
Review comment:
nit: seems these two logs could combine into one?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]