vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r368088887
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ##########
 @@ -1362,6 +1365,110 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with Matchers with Logging {
     }
   }
 
+  test("compact event log files") {
+    def verifyEventLogFiles(
+        fs: FileSystem,
+        rootPath: String,
+        expectedIndexForCompact: Option[Long],
+        expectedIndicesForNonCompact: Seq[Long]): Unit = {
+      val reader = EventLogFileReader(fs, new Path(rootPath)).get
+      var logFiles = reader.listEventLogFiles
+
+      expectedIndexForCompact.foreach { idx =>
+        val headFile = logFiles.head
+        assert(EventLogFileWriter.isCompacted(headFile.getPath))
+        assert(idx == 
RollingEventLogFilesWriter.getEventLogFileIndex(headFile.getPath.getName))
+        logFiles = logFiles.drop(1)
+      }
+
+      assert(logFiles.size === expectedIndicesForNonCompact.size)
+
+      logFiles.foreach { logFile =>
+        assert(RollingEventLogFilesWriter.isEventLogFile(logFile))
+        assert(!EventLogFileWriter.isCompacted(logFile.getPath))
+      }
+
+      val indices = logFiles.map { logFile =>
+        
RollingEventLogFilesWriter.getEventLogFileIndex(logFile.getPath.getName)
+      }
+      assert(expectedIndicesForNonCompact === indices)
+    }
+
+    withTempDir { dir =>
+      val conf = createTestConf()
+      conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
+      conf.set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 1)
+      conf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d)
+      val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+      val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)
+
+      val provider = new FsHistoryProvider(conf)
+
+      val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, 
conf, hadoopConf)
+      writer.start()
+
+      // writing event log file 1 - don't compact for now
+      writeEventsToRollingWriter(writer, Seq(
+        SparkListenerApplicationStart("app", Some("app"), 0, "user", None),
+        SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+
+      updateAndCheck(provider) { _ =>
+        verifyEventLogFiles(fs, writer.logPath, None, Seq(1))
+        val info = provider.listing.read(classOf[LogInfo], writer.logPath)
+        assert(info.lastIndexToRunCompaction === Some(1))
+      }
+
+      // writing event log file 2 - compact the event log file 1 into 1.compact
+      writeEventsToRollingWriter(writer, Seq.empty, rollFile = true)
+      writeEventsToRollingWriter(writer, Seq(SparkListenerUnpersistRDD(1),
+        SparkListenerJobEnd(1, 1, JobSucceeded)), rollFile = false)
+
+      updateAndCheck(provider) { _ =>
+        verifyEventLogFiles(fs, writer.logPath, Some(1), Seq(2))
+        val info = provider.listing.read(classOf[LogInfo], writer.logPath)
+        assert(info.lastIndexToRunCompaction === Some(2))
 
 Review comment:
   From that comment it seems that what you're saying is that, in this case, 
the log with index "2" isn't yet compacted, but it's been processed and found 
that it's not yet worth it to compact it. Is that right?
   
   I think the variable name is confusing because it mixes "last" (which to me 
evokes something that has happened already) and "to run" (which evokes a future 
action). So I can't really parse the meaning of the name since it seems to 
contradict itself.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to