Github user squito commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20138#discussion_r161660926
  
    --- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ---
    @@ -663,6 +665,95 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with BeforeAndAfter with Matc
         freshUI.get.ui.store.job(0)
       }
     
    +  test("clean up stale app information") {
    +    val storeDir = Utils.createTempDir()
    +    val conf = createTestConf().set(LOCAL_STORE_DIR, 
storeDir.getAbsolutePath())
    +    val provider = spy(new FsHistoryProvider(conf))
    +    val appId = "new1"
    +
    +    // Write logs for two app attempts.
    +    doReturn(1L).when(provider).getNewLastScanTime()
    +    val attempt1 = newLogFile(appId, Some("1"), inProgress = false)
    +    writeFile(attempt1, true, None,
    +      SparkListenerApplicationStart(appId, Some(appId), 1L, "test", 
Some("1")),
    +      SparkListenerJobStart(0, 1L, Nil, null),
    +      SparkListenerApplicationEnd(5L)
    +      )
    +    val attempt2 = newLogFile(appId, Some("2"), inProgress = false)
    +    writeFile(attempt2, true, None,
    +      SparkListenerApplicationStart(appId, Some(appId), 1L, "test", 
Some("2")),
    +      SparkListenerJobStart(0, 1L, Nil, null),
    +      SparkListenerApplicationEnd(5L)
    +      )
    +    updateAndCheck(provider) { list =>
    +      assert(list.size === 1)
    +      assert(list(0).id === appId)
    +      assert(list(0).attempts.size === 2)
    +    }
    +
    +    // Load the app's UI.
    +    val ui = provider.getAppUI(appId, Some("1"))
    +    assert(ui.isDefined)
    +
    +    // Delete the underlying log file for attempt 1 and rescan. The UI 
should go away, but since
    +    // attempt 2 still exists, listing data should be there.
    +    doReturn(2L).when(provider).getNewLastScanTime()
    +    attempt1.delete()
    +    updateAndCheck(provider) { list =>
    +      assert(list.size === 1)
    +      assert(list(0).id === appId)
    +      assert(list(0).attempts.size === 1)
    +    }
    +    assert(!ui.get.valid)
    +    assert(provider.getAppUI(appId, None) === None)
    +
    +    // Delete the second attempt's log file. Now everything should go away.
    +    doReturn(3L).when(provider).getNewLastScanTime()
    +    attempt2.delete()
    +    updateAndCheck(provider) { list =>
    +      assert(list.isEmpty)
    +    }
    +  }
    +
    +  test("SPARK-21571: clean up removes invalid history files") {
    +    val clock = new ManualClock(TimeUnit.DAYS.toMillis(120))
    +    val conf = createTestConf().set("spark.history.fs.cleaner.maxAge", 
s"2d")
    +    val provider = new FsHistoryProvider(conf, clock) {
    +      override def getNewLastScanTime(): Long = clock.getTimeMillis()
    +    }
    +
    +    // Create 0-byte size inprogress and complete files
    +    val logfile1 = newLogFile("emptyInprogressLogFile", None, inProgress = 
true)
    +    logfile1.createNewFile()
    +    logfile1.setLastModified(clock.getTimeMillis())
    +
    +    val logfile2 = newLogFile("emptyFinishedLogFile", None, inProgress = 
false)
    +    logfile2.createNewFile()
    +    logfile2.setLastModified(clock.getTimeMillis())
    +
    +    // Create an incomplete log file, has an end record but no start 
record.
    +    val logfile3 = newLogFile("nonEmptyCorruptLogFile", None, inProgress = 
false)
    +    writeFile(logfile3, true, None, SparkListenerApplicationEnd(0))
    +    logfile3.setLastModified(clock.getTimeMillis())
    +
    +    provider.checkForLogs()
    +    provider.cleanLogs()
    +    assert(new File(testDir.toURI).listFiles().size === 3)
    +
    +    // Move the clock forward 1 day and scan the files again. They should 
still be there.
    +    clock.advance(TimeUnit.DAYS.toMillis(1))
    +    provider.checkForLogs()
    +    provider.cleanLogs()
    +    assert(new File(testDir.toURI).listFiles().size === 3)
    +
    +    // Move the clock forward another 2 days and scan the files again. 
This time the cleaner should
    +    // pick up the invalid files and get rid of them.
    +    clock.advance(TimeUnit.DAYS.toMillis(2))
    +    provider.checkForLogs()
    +    provider.cleanLogs()
    +    assert(new File(testDir.toURI).listFiles().size === 0)
    --- End diff --
    
    I think you should add a case where one file starts out empty, say even for 
one full day, but then becomes valid before the expiration time, and make sure 
it does *not* get cleaned up.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to