HeartSaVioR commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r367191546
 
 

 ##########
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ##########
 @@ -1362,6 +1365,110 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with Matchers with Logging {
     }
   }
 
+  test("compact event log files") {
+    def verifyEventLogFiles(
+        fs: FileSystem,
+        rootPath: String,
+        expectedIndexForCompact: Option[Long],
+        expectedIndicesForNonCompact: Seq[Long]): Unit = {
+      val reader = EventLogFileReader(fs, new Path(rootPath)).get
+      var logFiles = reader.listEventLogFiles
+
+      expectedIndexForCompact.foreach { idx =>
+        val headFile = logFiles.head
+        assert(EventLogFileWriter.isCompacted(headFile.getPath))
+        assert(idx == 
RollingEventLogFilesWriter.getEventLogFileIndex(headFile.getPath.getName))
+        logFiles = logFiles.drop(1)
+      }
+
+      assert(logFiles.size === expectedIndicesForNonCompact.size)
+
+      logFiles.foreach { logFile =>
+        assert(RollingEventLogFilesWriter.isEventLogFile(logFile))
+        assert(!EventLogFileWriter.isCompacted(logFile.getPath))
+      }
+
+      val indices = logFiles.map { logFile =>
+        
RollingEventLogFilesWriter.getEventLogFileIndex(logFile.getPath.getName)
+      }
+      assert(expectedIndicesForNonCompact === indices)
+    }
+
+    withTempDir { dir =>
+      val conf = createTestConf()
+      conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
+      conf.set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 1)
+      conf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d)
+      val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+      val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)
+
+      val provider = new FsHistoryProvider(conf)
+
+      val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, 
conf, hadoopConf)
+      writer.start()
+
+      // writing event log file 1 - don't compact for now
+      writeEventsToRollingWriter(writer, Seq(
+        SparkListenerApplicationStart("app", Some("app"), 0, "user", None),
+        SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+
+      updateAndCheck(provider) { _ =>
+        verifyEventLogFiles(fs, writer.logPath, None, Seq(1))
+        val info = provider.listing.read(classOf[LogInfo], writer.logPath)
+        assert(info.lastIndexToRunCompaction === Some(1))
+      }
+
+      // writing event log file 2 - compact the event log file 1 into 1.compact
+      writeEventsToRollingWriter(writer, Seq.empty, rollFile = true)
+      writeEventsToRollingWriter(writer, Seq(SparkListenerUnpersistRDD(1),
+        SparkListenerJobEnd(1, 1, JobSucceeded)), rollFile = false)
+
+      updateAndCheck(provider) { _ =>
+        verifyEventLogFiles(fs, writer.logPath, Some(1), Seq(2))
+        val info = provider.listing.read(classOf[LogInfo], writer.logPath)
+        assert(info.lastIndexToRunCompaction === Some(2))
 
 Review comment:
   Please refer the comment 
https://github.com/apache/spark/pull/27208#discussion_r367191271 to see what it 
means. It's intentional to be 2.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to