[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-19 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20138


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-17 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r162160545
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ---
@@ -663,6 +665,95 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with BeforeAndAfter with Matc
 freshUI.get.ui.store.job(0)
   }
 
+  test("clean up stale app information") {
+val storeDir = Utils.createTempDir()
+val conf = createTestConf().set(LOCAL_STORE_DIR, 
storeDir.getAbsolutePath())
+val provider = spy(new FsHistoryProvider(conf))
+val appId = "new1"
+
+// Write logs for two app attempts.
+doReturn(1L).when(provider).getNewLastScanTime()
+val attempt1 = newLogFile(appId, Some("1"), inProgress = false)
+writeFile(attempt1, true, None,
+  SparkListenerApplicationStart(appId, Some(appId), 1L, "test", 
Some("1")),
+  SparkListenerJobStart(0, 1L, Nil, null),
+  SparkListenerApplicationEnd(5L)
+  )
+val attempt2 = newLogFile(appId, Some("2"), inProgress = false)
+writeFile(attempt2, true, None,
+  SparkListenerApplicationStart(appId, Some(appId), 1L, "test", 
Some("2")),
+  SparkListenerJobStart(0, 1L, Nil, null),
+  SparkListenerApplicationEnd(5L)
+  )
+updateAndCheck(provider) { list =>
+  assert(list.size === 1)
+  assert(list(0).id === appId)
+  assert(list(0).attempts.size === 2)
+}
+
+// Load the app's UI.
+val ui = provider.getAppUI(appId, Some("1"))
+assert(ui.isDefined)
+
+// Delete the underlying log file for attempt 1 and rescan. The UI 
should go away, but since
+// attempt 2 still exists, listing data should be there.
+doReturn(2L).when(provider).getNewLastScanTime()
+attempt1.delete()
+updateAndCheck(provider) { list =>
+  assert(list.size === 1)
+  assert(list(0).id === appId)
+  assert(list(0).attempts.size === 1)
+}
+assert(!ui.get.valid)
+assert(provider.getAppUI(appId, None) === None)
+
+// Delete the second attempt's log file. Now everything should go away.
+doReturn(3L).when(provider).getNewLastScanTime()
+attempt2.delete()
+updateAndCheck(provider) { list =>
+  assert(list.isEmpty)
+}
+  }
+
+  test("SPARK-21571: clean up removes invalid history files") {
+val clock = new ManualClock(TimeUnit.DAYS.toMillis(120))
--- End diff --

FYI #20284 fixes the underlying bug.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-16 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r161882635
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ---
@@ -663,6 +665,95 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with BeforeAndAfter with Matc
 freshUI.get.ui.store.job(0)
   }
 
+  test("clean up stale app information") {
+val storeDir = Utils.createTempDir()
+val conf = createTestConf().set(LOCAL_STORE_DIR, 
storeDir.getAbsolutePath())
+val provider = spy(new FsHistoryProvider(conf))
+val appId = "new1"
+
+// Write logs for two app attempts.
+doReturn(1L).when(provider).getNewLastScanTime()
+val attempt1 = newLogFile(appId, Some("1"), inProgress = false)
+writeFile(attempt1, true, None,
+  SparkListenerApplicationStart(appId, Some(appId), 1L, "test", 
Some("1")),
+  SparkListenerJobStart(0, 1L, Nil, null),
+  SparkListenerApplicationEnd(5L)
+  )
+val attempt2 = newLogFile(appId, Some("2"), inProgress = false)
+writeFile(attempt2, true, None,
+  SparkListenerApplicationStart(appId, Some(appId), 1L, "test", 
Some("2")),
+  SparkListenerJobStart(0, 1L, Nil, null),
+  SparkListenerApplicationEnd(5L)
+  )
+updateAndCheck(provider) { list =>
+  assert(list.size === 1)
+  assert(list(0).id === appId)
+  assert(list(0).attempts.size === 2)
+}
+
+// Load the app's UI.
+val ui = provider.getAppUI(appId, Some("1"))
+assert(ui.isDefined)
+
+// Delete the underlying log file for attempt 1 and rescan. The UI 
should go away, but since
+// attempt 2 still exists, listing data should be there.
+doReturn(2L).when(provider).getNewLastScanTime()
+attempt1.delete()
+updateAndCheck(provider) { list =>
+  assert(list.size === 1)
+  assert(list(0).id === appId)
+  assert(list(0).attempts.size === 1)
+}
+assert(!ui.get.valid)
+assert(provider.getAppUI(appId, None) === None)
+
+// Delete the second attempt's log file. Now everything should go away.
+doReturn(3L).when(provider).getNewLastScanTime()
+attempt2.delete()
+updateAndCheck(provider) { list =>
+  assert(list.isEmpty)
+}
+  }
+
+  test("SPARK-21571: clean up removes invalid history files") {
+val clock = new ManualClock(TimeUnit.DAYS.toMillis(120))
--- End diff --

This line:

val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000

Without that `maxTime` would be negative and that seems to be triggering a 
bug somewhere else. I need to take a look at exactly what's happening there, 
but it seems unrelated to this change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-16 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r161880321
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -405,49 +404,70 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 try {
   val newLastScanTime = getNewLastScanTime()
   logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
-  // scan for modified applications, replay and merge them
-  val logInfos = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
+
+  val updated = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
 .filter { entry =>
   !entry.isDirectory() &&
 // FsHistoryProvider generates a hidden file which can't be 
read.  Accidentally
 // reading a garbage file is safe, but we would log an error 
which can be scary to
 // the end-user.
 !entry.getPath().getName().startsWith(".") &&
-SparkHadoopUtil.get.checkAccessPermission(entry, 
FsAction.READ) &&
-recordedFileSize(entry.getPath()) < entry.getLen()
+SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
+}
+.filter { entry =>
+  try {
+val info = listing.read(classOf[LogInfo], 
entry.getPath().toString())
+if (info.fileSize < entry.getLen()) {
+  // Log size has changed, it should be parsed.
+  true
+} else {
+  // If the SHS view has a valid application, update the time 
the file was last seen so
+  // that the entry is not deleted from the SHS listing.
+  if (info.appId.isDefined) {
+listing.write(info.copy(lastProcessed = newLastScanTime))
+  }
+  false
+}
+  } catch {
+case _: NoSuchElementException =>
+  // If the file is currently not being tracked by the SHS, 
add an entry for it and try
+  // to parse it. This will allow the cleaner code to detect 
the file as stale later on
+  // if it was not possible to parse it.
+  listing.write(LogInfo(entry.getPath().toString(), 
newLastScanTime, None, None,
+entry.getLen()))
+  entry.getLen() > 0
+  }
 }
 .sortWith { case (entry1, entry2) =>
   entry1.getModificationTime() > entry2.getModificationTime()
 }
 
-  if (logInfos.nonEmpty) {
-logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
+  if (updated.nonEmpty) {
+logDebug(s"New/updated attempts found: ${updated.size} 
${updated.map(_.getPath)}")
   }
 
-  var tasks = mutable.ListBuffer[Future[_]]()
-
-  try {
-for (file <- logInfos) {
-  tasks += replayExecutor.submit(new Runnable {
-override def run(): Unit = mergeApplicationListing(file)
+  val tasks = updated.map { entry =>
+try {
+  replayExecutor.submit(new Runnable {
+override def run(): Unit = mergeApplicationListing(entry, 
newLastScanTime)
   })
+} catch {
+  // let the iteration over logInfos break, since an exception on
--- End diff --

Actually `RejectedExecutionException` shouldn't ever be thrown here. The 
executor doesn't have a bounded queue, and it's very unlikely you'll ever 
submit `Integer.MAX_VALUE` tasks here.

The code didn't use to catch any exception here (it was added along with 
the comment in a531fe1). Catching the exception doesn't do any harm, I just 
don't think this code will ever trigger.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-15 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r161660926
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ---
@@ -663,6 +665,95 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with BeforeAndAfter with Matc
 freshUI.get.ui.store.job(0)
   }
 
+  test("clean up stale app information") {
+val storeDir = Utils.createTempDir()
+val conf = createTestConf().set(LOCAL_STORE_DIR, 
storeDir.getAbsolutePath())
+val provider = spy(new FsHistoryProvider(conf))
+val appId = "new1"
+
+// Write logs for two app attempts.
+doReturn(1L).when(provider).getNewLastScanTime()
+val attempt1 = newLogFile(appId, Some("1"), inProgress = false)
+writeFile(attempt1, true, None,
+  SparkListenerApplicationStart(appId, Some(appId), 1L, "test", 
Some("1")),
+  SparkListenerJobStart(0, 1L, Nil, null),
+  SparkListenerApplicationEnd(5L)
+  )
+val attempt2 = newLogFile(appId, Some("2"), inProgress = false)
+writeFile(attempt2, true, None,
+  SparkListenerApplicationStart(appId, Some(appId), 1L, "test", 
Some("2")),
+  SparkListenerJobStart(0, 1L, Nil, null),
+  SparkListenerApplicationEnd(5L)
+  )
+updateAndCheck(provider) { list =>
+  assert(list.size === 1)
+  assert(list(0).id === appId)
+  assert(list(0).attempts.size === 2)
+}
+
+// Load the app's UI.
+val ui = provider.getAppUI(appId, Some("1"))
+assert(ui.isDefined)
+
+// Delete the underlying log file for attempt 1 and rescan. The UI 
should go away, but since
+// attempt 2 still exists, listing data should be there.
+doReturn(2L).when(provider).getNewLastScanTime()
+attempt1.delete()
+updateAndCheck(provider) { list =>
+  assert(list.size === 1)
+  assert(list(0).id === appId)
+  assert(list(0).attempts.size === 1)
+}
+assert(!ui.get.valid)
+assert(provider.getAppUI(appId, None) === None)
+
+// Delete the second attempt's log file. Now everything should go away.
+doReturn(3L).when(provider).getNewLastScanTime()
+attempt2.delete()
+updateAndCheck(provider) { list =>
+  assert(list.isEmpty)
+}
+  }
+
+  test("SPARK-21571: clean up removes invalid history files") {
+val clock = new ManualClock(TimeUnit.DAYS.toMillis(120))
+val conf = createTestConf().set("spark.history.fs.cleaner.maxAge", 
s"2d")
+val provider = new FsHistoryProvider(conf, clock) {
+  override def getNewLastScanTime(): Long = clock.getTimeMillis()
+}
+
+// Create 0-byte size inprogress and complete files
+val logfile1 = newLogFile("emptyInprogressLogFile", None, inProgress = 
true)
+logfile1.createNewFile()
+logfile1.setLastModified(clock.getTimeMillis())
+
+val logfile2 = newLogFile("emptyFinishedLogFile", None, inProgress = 
false)
+logfile2.createNewFile()
+logfile2.setLastModified(clock.getTimeMillis())
+
+// Create an incomplete log file, has an end record but no start 
record.
+val logfile3 = newLogFile("nonEmptyCorruptLogFile", None, inProgress = 
false)
+writeFile(logfile3, true, None, SparkListenerApplicationEnd(0))
+logfile3.setLastModified(clock.getTimeMillis())
+
+provider.checkForLogs()
+provider.cleanLogs()
+assert(new File(testDir.toURI).listFiles().size === 3)
+
+// Move the clock forward 1 day and scan the files again. They should 
still be there.
+clock.advance(TimeUnit.DAYS.toMillis(1))
+provider.checkForLogs()
+provider.cleanLogs()
+assert(new File(testDir.toURI).listFiles().size === 3)
+
+// Move the clock forward another 2 days and scan the files again. 
This time the cleaner should
+// pick up the invalid files and get rid of them.
+clock.advance(TimeUnit.DAYS.toMillis(2))
+provider.checkForLogs()
+provider.cleanLogs()
+assert(new File(testDir.toURI).listFiles().size === 0)
--- End diff --

I think you should add a case where one file starts out empty, say even for 
one full day, but then becomes valid before the expiration time, and make sure 
it does *not* get cleaned up.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-14 Thread jiangxb1987
Github user jiangxb1987 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r161443974
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -405,49 +404,70 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 try {
   val newLastScanTime = getNewLastScanTime()
   logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
-  // scan for modified applications, replay and merge them
-  val logInfos = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
+
+  val updated = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
 .filter { entry =>
   !entry.isDirectory() &&
 // FsHistoryProvider generates a hidden file which can't be 
read.  Accidentally
 // reading a garbage file is safe, but we would log an error 
which can be scary to
 // the end-user.
 !entry.getPath().getName().startsWith(".") &&
-SparkHadoopUtil.get.checkAccessPermission(entry, 
FsAction.READ) &&
-recordedFileSize(entry.getPath()) < entry.getLen()
+SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
+}
+.filter { entry =>
+  try {
+val info = listing.read(classOf[LogInfo], 
entry.getPath().toString())
+if (info.fileSize < entry.getLen()) {
+  // Log size has changed, it should be parsed.
+  true
+} else {
+  // If the SHS view has a valid application, update the time 
the file was last seen so
+  // that the entry is not deleted from the SHS listing.
+  if (info.appId.isDefined) {
+listing.write(info.copy(lastProcessed = newLastScanTime))
+  }
+  false
+}
+  } catch {
+case _: NoSuchElementException =>
+  // If the file is currently not being tracked by the SHS, 
add an entry for it and try
+  // to parse it. This will allow the cleaner code to detect 
the file as stale later on
+  // if it was not possible to parse it.
+  listing.write(LogInfo(entry.getPath().toString(), 
newLastScanTime, None, None,
+entry.getLen()))
+  entry.getLen() > 0
+  }
 }
 .sortWith { case (entry1, entry2) =>
   entry1.getModificationTime() > entry2.getModificationTime()
 }
 
-  if (logInfos.nonEmpty) {
-logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
+  if (updated.nonEmpty) {
+logDebug(s"New/updated attempts found: ${updated.size} 
${updated.map(_.getPath)}")
   }
 
-  var tasks = mutable.ListBuffer[Future[_]]()
-
-  try {
-for (file <- logInfos) {
-  tasks += replayExecutor.submit(new Runnable {
-override def run(): Unit = mergeApplicationListing(file)
+  val tasks = updated.map { entry =>
+try {
+  replayExecutor.submit(new Runnable {
+override def run(): Unit = mergeApplicationListing(entry, 
newLastScanTime)
   })
+} catch {
+  // let the iteration over logInfos break, since an exception on
--- End diff --

Maybe we should handle `RejectedExecutionException` explicitly, under this 
exception, we can log an error message and stop submit the rest tasks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r161099778
  
--- Diff: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ---
@@ -663,6 +665,95 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with BeforeAndAfter with Matc
 freshUI.get.ui.store.job(0)
   }
 
+  test("clean up stale app information") {
+val storeDir = Utils.createTempDir()
+val conf = createTestConf().set(LOCAL_STORE_DIR, 
storeDir.getAbsolutePath())
+val provider = spy(new FsHistoryProvider(conf))
+val appId = "new1"
+
+// Write logs for two app attempts.
+doReturn(1L).when(provider).getNewLastScanTime()
+val attempt1 = newLogFile(appId, Some("1"), inProgress = false)
+writeFile(attempt1, true, None,
+  SparkListenerApplicationStart(appId, Some(appId), 1L, "test", 
Some("1")),
+  SparkListenerJobStart(0, 1L, Nil, null),
+  SparkListenerApplicationEnd(5L)
+  )
+val attempt2 = newLogFile(appId, Some("2"), inProgress = false)
+writeFile(attempt2, true, None,
+  SparkListenerApplicationStart(appId, Some(appId), 1L, "test", 
Some("2")),
+  SparkListenerJobStart(0, 1L, Nil, null),
+  SparkListenerApplicationEnd(5L)
+  )
+updateAndCheck(provider) { list =>
+  assert(list.size === 1)
+  assert(list(0).id === appId)
+  assert(list(0).attempts.size === 2)
+}
+
+// Load the app's UI.
+val ui = provider.getAppUI(appId, Some("1"))
+assert(ui.isDefined)
+
+// Delete the underlying log file for attempt 1 and rescan. The UI 
should go away, but since
+// attempt 2 still exists, listing data should be there.
+doReturn(2L).when(provider).getNewLastScanTime()
+attempt1.delete()
+updateAndCheck(provider) { list =>
+  assert(list.size === 1)
+  assert(list(0).id === appId)
+  assert(list(0).attempts.size === 1)
+}
+assert(!ui.get.valid)
+assert(provider.getAppUI(appId, None) === None)
+
+// Delete the second attempt's log file. Now everything should go away.
+doReturn(3L).when(provider).getNewLastScanTime()
+attempt2.delete()
+updateAndCheck(provider) { list =>
+  assert(list.isEmpty)
+}
+  }
+
+  test("SPARK-21571: clean up removes invalid history files") {
+val clock = new ManualClock(TimeUnit.DAYS.toMillis(120))
--- End diff --

just curious, why start at 120 days?  (not that it matters ...)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r161099310
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -834,6 +906,9 @@ private[history] case class FsHistoryProviderMetadata(
 
 private[history] case class LogInfo(
 @KVIndexParam logPath: String,
+@KVIndexParam("lastProcessed") lastProcessed: Long,
+appId: Option[String],
--- End diff --

also a comment here explaining why appId is an Option, as that is unexpected


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r161098356
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -544,73 +621,75 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 bus.addListener(listener)
 replay(fileStatus, bus, eventsFilter = eventsFilter)
 
-listener.applicationInfo.foreach { app =>
-  // Invalidate the existing UI for the reloaded app attempt, if any. 
See LoadedAppUI for a
-  // discussion on the UI lifecycle.
-  synchronized {
-activeUIs.get((app.info.id, 
app.attempts.head.info.attemptId)).foreach { ui =>
-  ui.invalidate()
-  ui.ui.store.close()
+val (appId, attemptId) = listener.applicationInfo match {
+  case Some(app) =>
+// Invalidate the existing UI for the reloaded app attempt, if 
any. See LoadedAppUI for a
+// discussion on the UI lifecycle.
+synchronized {
+  activeUIs.get((app.info.id, 
app.attempts.head.info.attemptId)).foreach { ui =>
+ui.invalidate()
+ui.ui.store.close()
+  }
 }
-  }
 
-  addListing(app)
+addListing(app)
+(Some(app.info.id), app.attempts.head.info.attemptId)
+
+  case _ =>
+(None, None)
--- End diff --

I think comment here explaining that writing an entry with no appId will 
mark this log file as eligible for automatic recovery, if its still in that 
state after max_log_age.  (if I understood correctly)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r161095561
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -405,49 +404,70 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 try {
   val newLastScanTime = getNewLastScanTime()
   logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
-  // scan for modified applications, replay and merge them
-  val logInfos = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
+
+  val updated = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
 .filter { entry =>
   !entry.isDirectory() &&
 // FsHistoryProvider generates a hidden file which can't be 
read.  Accidentally
 // reading a garbage file is safe, but we would log an error 
which can be scary to
 // the end-user.
 !entry.getPath().getName().startsWith(".") &&
-SparkHadoopUtil.get.checkAccessPermission(entry, 
FsAction.READ) &&
-recordedFileSize(entry.getPath()) < entry.getLen()
+SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
+}
+.filter { entry =>
+  try {
+val info = listing.read(classOf[LogInfo], 
entry.getPath().toString())
+if (info.fileSize < entry.getLen()) {
+  // Log size has changed, it should be parsed.
+  true
+} else {
+  // If the SHS view has a valid application, update the time 
the file was last seen so
+  // that the entry is not deleted from the SHS listing.
+  if (info.appId.isDefined) {
+listing.write(info.copy(lastProcessed = newLastScanTime))
+  }
+  false
+}
+  } catch {
+case _: NoSuchElementException =>
+  // If the file is currently not being tracked by the SHS, 
add an entry for it and try
+  // to parse it. This will allow the cleaner code to detect 
the file as stale later on
+  // if it was not possible to parse it.
+  listing.write(LogInfo(entry.getPath().toString(), 
newLastScanTime, None, None,
+entry.getLen()))
+  entry.getLen() > 0
+  }
 }
 .sortWith { case (entry1, entry2) =>
   entry1.getModificationTime() > entry2.getModificationTime()
 }
 
-  if (logInfos.nonEmpty) {
-logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
+  if (updated.nonEmpty) {
+logDebug(s"New/updated attempts found: ${updated.size} 
${updated.map(_.getPath)}")
   }
 
-  var tasks = mutable.ListBuffer[Future[_]]()
-
-  try {
-for (file <- logInfos) {
-  tasks += replayExecutor.submit(new Runnable {
-override def run(): Unit = mergeApplicationListing(file)
+  val tasks = updated.map { entry =>
+try {
+  replayExecutor.submit(new Runnable {
+override def run(): Unit = mergeApplicationListing(entry, 
newLastScanTime)
   })
+} catch {
+  // let the iteration over logInfos break, since an exception on
--- End diff --

and actually you've moved the try/catch so this is no longer true, you'll 
continue to submit all tasks if one throws an exception.  (I guess I'm not 
really sure why the old code did it that way ...)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-11 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r161082423
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -405,49 +404,70 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 try {
   val newLastScanTime = getNewLastScanTime()
   logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")
-  // scan for modified applications, replay and merge them
-  val logInfos = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
+
+  val updated = Option(fs.listStatus(new 
Path(logDir))).map(_.toSeq).getOrElse(Nil)
 .filter { entry =>
   !entry.isDirectory() &&
 // FsHistoryProvider generates a hidden file which can't be 
read.  Accidentally
 // reading a garbage file is safe, but we would log an error 
which can be scary to
 // the end-user.
 !entry.getPath().getName().startsWith(".") &&
-SparkHadoopUtil.get.checkAccessPermission(entry, 
FsAction.READ) &&
-recordedFileSize(entry.getPath()) < entry.getLen()
+SparkHadoopUtil.get.checkAccessPermission(entry, FsAction.READ)
+}
+.filter { entry =>
+  try {
+val info = listing.read(classOf[LogInfo], 
entry.getPath().toString())
+if (info.fileSize < entry.getLen()) {
+  // Log size has changed, it should be parsed.
+  true
+} else {
+  // If the SHS view has a valid application, update the time 
the file was last seen so
+  // that the entry is not deleted from the SHS listing.
+  if (info.appId.isDefined) {
+listing.write(info.copy(lastProcessed = newLastScanTime))
+  }
+  false
+}
+  } catch {
+case _: NoSuchElementException =>
+  // If the file is currently not being tracked by the SHS, 
add an entry for it and try
+  // to parse it. This will allow the cleaner code to detect 
the file as stale later on
+  // if it was not possible to parse it.
+  listing.write(LogInfo(entry.getPath().toString(), 
newLastScanTime, None, None,
+entry.getLen()))
+  entry.getLen() > 0
+  }
 }
 .sortWith { case (entry1, entry2) =>
   entry1.getModificationTime() > entry2.getModificationTime()
 }
 
-  if (logInfos.nonEmpty) {
-logDebug(s"New/updated attempts found: ${logInfos.size} 
${logInfos.map(_.getPath)}")
+  if (updated.nonEmpty) {
+logDebug(s"New/updated attempts found: ${updated.size} 
${updated.map(_.getPath)}")
   }
 
-  var tasks = mutable.ListBuffer[Future[_]]()
-
-  try {
-for (file <- logInfos) {
-  tasks += replayExecutor.submit(new Runnable {
-override def run(): Unit = mergeApplicationListing(file)
+  val tasks = updated.map { entry =>
+try {
+  replayExecutor.submit(new Runnable {
+override def run(): Unit = mergeApplicationListing(entry, 
newLastScanTime)
   })
+} catch {
+  // let the iteration over logInfos break, since an exception on
--- End diff --

you've renamed `logInfos` to `updated`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-04 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r159691083
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -806,6 +867,17 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   throw new NoSuchElementException(s"Cannot find attempt $attemptId of 
$appId."))
   }
 
+  private def deleteLog(log: Path): Unit = {
+try {
+  fs.delete(log, true)
+} catch {
+  case e: AccessControlException =>
--- End diff --

Nit: `e` is not used


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-04 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r159690722
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -544,73 +621,75 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 bus.addListener(listener)
 replay(fileStatus, bus, eventsFilter = eventsFilter)
 
-listener.applicationInfo.foreach { app =>
-  // Invalidate the existing UI for the reloaded app attempt, if any. 
See LoadedAppUI for a
-  // discussion on the UI lifecycle.
-  synchronized {
-activeUIs.get((app.info.id, 
app.attempts.head.info.attemptId)).foreach { ui =>
-  ui.invalidate()
-  ui.ui.store.close()
+val (appId, attemptId) = listener.applicationInfo match {
+  case Some(app) =>
+// Invalidate the existing UI for the reloaded app attempt, if 
any. See LoadedAppUI for a
+// discussion on the UI lifecycle.
+synchronized {
+  activeUIs.get((app.info.id, 
app.attempts.head.info.attemptId)).foreach { ui =>
+ui.invalidate()
+ui.ui.store.close()
+  }
 }
-  }
 
-  addListing(app)
+addListing(app)
+(Some(app.info.id), app.attempts.head.info.attemptId)
+
+  case _ =>
+(None, None)
 }
-listing.write(new LogInfo(logPath.toString(), fileStatus.getLen()))
+listing.write(LogInfo(logPath.toString(), scanTime, appId, attemptId, 
fileStatus.getLen()))
   }
 
   /**
* Delete event logs from the log directory according to the clean 
policy defined by the user.
*/
-  private[history] def cleanLogs(): Unit = {
-var iterator: Option[KVStoreIterator[ApplicationInfoWrapper]] = None
-try {
-  val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
-
-  // Iterate descending over all applications whose oldest attempt 
happened before maxTime.
-  iterator = Some(listing.view(classOf[ApplicationInfoWrapper])
-.index("oldestAttempt")
-.reverse()
-.first(maxTime)
-.closeableIterator())
-
-  iterator.get.asScala.foreach { app =>
-// Applications may have multiple attempts, some of which may not 
need to be deleted yet.
-val (remaining, toDelete) = app.attempts.partition { attempt =>
-  attempt.info.lastUpdated.getTime() >= maxTime
-}
+  private[history] def cleanLogs(): Unit = Utils.tryLog {
+val maxTime = clock.getTimeMillis() - conf.get(MAX_LOG_AGE_S) * 1000
 
-if (remaining.nonEmpty) {
-  val newApp = new ApplicationInfoWrapper(app.info, remaining)
-  listing.write(newApp)
-}
+val expired = listing.view(classOf[ApplicationInfoWrapper])
+  .index("oldestAttempt")
+  .reverse()
+  .first(maxTime)
+  .asScala
+  .toList
+expired.foreach { app =>
+  // Applications may have multiple attempts, some of which may not 
need to be deleted yet.
+  val (remaining, toDelete) = app.attempts.partition { attempt =>
+attempt.info.lastUpdated.getTime() >= maxTime
+  }
 
-toDelete.foreach { attempt =>
-  val logPath = new Path(logDir, attempt.logPath)
-  try {
-listing.delete(classOf[LogInfo], logPath.toString())
-  } catch {
-case _: NoSuchElementException =>
-  logDebug(s"Log info entry for $logPath not found.")
-  }
-  try {
-fs.delete(logPath, true)
-  } catch {
-case e: AccessControlException =>
-  logInfo(s"No permission to delete ${attempt.logPath}, 
ignoring.")
-case t: IOException =>
-  logError(s"IOException in cleaning ${attempt.logPath}", t)
-  }
-}
+  if (remaining.nonEmpty) {
+val newApp = new ApplicationInfoWrapper(app.info, remaining)
+listing.write(newApp)
+  }
 
-if (remaining.isEmpty) {
-  listing.delete(app.getClass(), app.id)
-}
+  toDelete.foreach { attempt =>
+logInfo(s"Deleting expired event log for ${attempt.logPath}")
+val logPath = new Path(logDir, attempt.logPath)
+listing.delete(classOf[LogInfo], logPath.toString())
+cleanAppData(app.id, attempt.info.attemptId, logPath.toString())
+deleteLog(logPath)
+  }
+
+  if (remaining.isEmpty) {
+listing.delete(app.getClass(), app.id)
+  }
+}
+
+// Delete log files that don't have 

[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-04 Thread gengliangwang
Github user gengliangwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/20138#discussion_r159705360
  
--- Diff: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---
@@ -806,6 +867,17 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   throw new NoSuchElementException(s"Cannot find attempt $attemptId of 
$appId."))
   }
 
+  private def deleteLog(log: Path): Unit = {
+try {
+  fs.delete(log, true)
+} catch {
+  case e: AccessControlException =>
--- End diff --

Nit: `e` is not used


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20138: [SPARK-20664][core] Delete stale application data...

2018-01-02 Thread vanzin
GitHub user vanzin opened a pull request:

https://github.com/apache/spark/pull/20138

[SPARK-20664][core] Delete stale application data from SHS.

Detect the deletion of event log files from storage, and remove
data about the related application attempt in the SHS.

Also contains code to fix SPARK-21571 based on code by ericvandenbergfb.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vanzin/spark SPARK-20664

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20138.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20138


commit 9d710d68f74e9be0cb28cea1b210ec69fe8ccf4d
Author: Marcelo Vanzin 
Date:   2017-05-08T23:11:00Z

[SPARK-20664][core] Delete stale application data from SHS.

Detect the deletion of event log files from storage, and remove
data about the related application attempt in the SHS.

Also contains code to fix SPARK-21571 based on code by ericvandenbergfb.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org