HeartSaVioR commented on a change in pull request #27208: [SPARK-30481][CORE]
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r368274937
##########
File path:
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
##########
@@ -1362,6 +1365,110 @@ class FsHistoryProviderSuite extends SparkFunSuite
with Matchers with Logging {
}
}
+ test("compact event log files") {
+ def verifyEventLogFiles(
+ fs: FileSystem,
+ rootPath: String,
+ expectedIndexForCompact: Option[Long],
+ expectedIndicesForNonCompact: Seq[Long]): Unit = {
+ val reader = EventLogFileReader(fs, new Path(rootPath)).get
+ var logFiles = reader.listEventLogFiles
+
+ expectedIndexForCompact.foreach { idx =>
+ val headFile = logFiles.head
+ assert(EventLogFileWriter.isCompacted(headFile.getPath))
+ assert(idx ==
RollingEventLogFilesWriter.getEventLogFileIndex(headFile.getPath.getName))
+ logFiles = logFiles.drop(1)
+ }
+
+ assert(logFiles.size === expectedIndicesForNonCompact.size)
+
+ logFiles.foreach { logFile =>
+ assert(RollingEventLogFilesWriter.isEventLogFile(logFile))
+ assert(!EventLogFileWriter.isCompacted(logFile.getPath))
+ }
+
+ val indices = logFiles.map { logFile =>
+
RollingEventLogFilesWriter.getEventLogFileIndex(logFile.getPath.getName)
+ }
+ assert(expectedIndicesForNonCompact === indices)
+ }
+
+ withTempDir { dir =>
+ val conf = createTestConf()
+ conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
+ conf.set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 1)
+ conf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d)
+ val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+ val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)
+
+ val provider = new FsHistoryProvider(conf)
+
+ val writer = new RollingEventLogFilesWriter("app", None, dir.toURI,
conf, hadoopConf)
+ writer.start()
+
+ // writing event log file 1 - don't compact for now
+ writeEventsToRollingWriter(writer, Seq(
+ SparkListenerApplicationStart("app", Some("app"), 0, "user", None),
+ SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+
+ updateAndCheck(provider) { _ =>
+ verifyEventLogFiles(fs, writer.logPath, None, Seq(1))
+ val info = provider.listing.read(classOf[LogInfo], writer.logPath)
+ assert(info.lastIndexToRunCompaction === Some(1))
+ }
+
+ // writing event log file 2 - compact the event log file 1 into 1.compact
+ writeEventsToRollingWriter(writer, Seq.empty, rollFile = true)
+ writeEventsToRollingWriter(writer, Seq(SparkListenerUnpersistRDD(1),
+ SparkListenerJobEnd(1, 1, JobSucceeded)), rollFile = false)
+
+ updateAndCheck(provider) { _ =>
+ verifyEventLogFiles(fs, writer.logPath, Some(1), Seq(2))
+ val info = provider.listing.read(classOf[LogInfo], writer.logPath)
+ assert(info.lastIndexToRunCompaction === Some(2))
Review comment:
> From that comment it seems that what you're saying is that, in this case,
the log with index "2" isn't yet compacted, but it's been processed and found
that it's not yet worth it to compact it. Is that right?
Yes, correct, it applies regardless of the result of compaction.
> If my understanding is correct then perhaps a better name is
lastEvaluatedForCompaction (didn't add "index" because the name is already
pretty long as it is).
That's a better name. Thanks! Will address.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]