HeartSaVioR commented on a change in pull request #27208: [SPARK-30481][CORE]
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r371577205
##########
File path:
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
##########
@@ -1362,6 +1365,110 @@ class FsHistoryProviderSuite extends SparkFunSuite
with Matchers with Logging {
}
}
+ test("compact event log files") {
+ def verifyEventLogFiles(
+ fs: FileSystem,
+ rootPath: String,
+ expectedIndexForCompact: Option[Long],
+ expectedIndicesForNonCompact: Seq[Long]): Unit = {
+ val reader = EventLogFileReader(fs, new Path(rootPath)).get
+ var logFiles = reader.listEventLogFiles
+
+ expectedIndexForCompact.foreach { idx =>
+ val headFile = logFiles.head
+ assert(EventLogFileWriter.isCompacted(headFile.getPath))
+ assert(idx ==
RollingEventLogFilesWriter.getEventLogFileIndex(headFile.getPath.getName))
+ logFiles = logFiles.drop(1)
+ }
+
+ assert(logFiles.size === expectedIndicesForNonCompact.size)
+
+ logFiles.foreach { logFile =>
+ assert(RollingEventLogFilesWriter.isEventLogFile(logFile))
+ assert(!EventLogFileWriter.isCompacted(logFile.getPath))
+ }
+
+ val indices = logFiles.map { logFile =>
+
RollingEventLogFilesWriter.getEventLogFileIndex(logFile.getPath.getName)
+ }
+ assert(expectedIndicesForNonCompact === indices)
+ }
+
+ withTempDir { dir =>
+ val conf = createTestConf()
+ conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
+ conf.set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 1)
+ conf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d)
+ val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+ val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)
+
+ val provider = new FsHistoryProvider(conf)
+
+ val writer = new RollingEventLogFilesWriter("app", None, dir.toURI,
conf, hadoopConf)
+ writer.start()
+
+ // writing event log file 1 - don't compact for now
+ writeEventsToRollingWriter(writer, Seq(
+ SparkListenerApplicationStart("app", Some("app"), 0, "user", None),
+ SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+
+ updateAndCheck(provider) { _ =>
+ verifyEventLogFiles(fs, writer.logPath, None, Seq(1))
+ val info = provider.listing.read(classOf[LogInfo], writer.logPath)
+ assert(info.lastEvaluatedForCompaction === Some(1))
+ }
+
+ // writing event log file 2 - compact the event log file 1 into 1.compact
+ writeEventsToRollingWriter(writer, Seq.empty, rollFile = true)
+ writeEventsToRollingWriter(writer, Seq(SparkListenerUnpersistRDD(1),
+ SparkListenerJobEnd(1, 1, JobSucceeded)), rollFile = false)
+
+ updateAndCheck(provider) { _ =>
+ verifyEventLogFiles(fs, writer.logPath, Some(1), Seq(2))
+ val info = provider.listing.read(classOf[LogInfo], writer.logPath)
+ assert(info.lastEvaluatedForCompaction === Some(2))
+ }
+
+ // writing event log file 3 - compact two files - 1.compact & 2 into
one, 2.compact
+ writeEventsToRollingWriter(writer, Seq.empty, rollFile = true)
+ writeEventsToRollingWriter(writer, Seq(
+ SparkListenerExecutorAdded(3, "exec1", new ExecutorInfo("host1", 1,
Map.empty)),
+ SparkListenerJobStart(2, 4, Seq.empty),
+ SparkListenerJobEnd(2, 5, JobSucceeded)), rollFile = false)
+
+ writer.stop()
+
+ updateAndCheck(provider) { _ =>
+ verifyEventLogFiles(fs, writer.logPath, Some(2), Seq(3))
+
+ val info = provider.listing.read(classOf[LogInfo], writer.logPath)
+ assert(info.lastEvaluatedForCompaction === Some(3))
+
+ val store = new InMemoryStore
+ val appStore = new AppStatusStore(store)
+
+ val reader = EventLogFileReader(fs, new Path(writer.logPath)).get
+ provider.rebuildAppStore(store, reader, 0L)
+
+ // replayed store doesn't have any job, as events for job are removed
while compacting
+ intercept[NoSuchElementException] {
+ appStore.job(1)
+ }
+
+ // but other events should be available even they were in original
files to compact
+ val appInfo = appStore.applicationInfo()
+ assert(appInfo.id === "app")
+ assert(appInfo.name === "app")
+
+ // all events in retained file should be available, even they're
related to finished jobs
Review comment:
Uh, even the latter verification code technically checks the events for
executors (as it's relatively easier to check with AppStatusStore), the comment
is still correct - we don't filter out any events in "retained" file(s). I'm
refining the comment a bit to clarify these events will include removed
executors as well.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]