[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server

2020-01-27 Thread GitBox
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r371572754
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ##
 @@ -1362,6 +1365,110 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with Matchers with Logging {
 }
   }
 
+  test("compact event log files") {
+def verifyEventLogFiles(
+fs: FileSystem,
+rootPath: String,
+expectedIndexForCompact: Option[Long],
+expectedIndicesForNonCompact: Seq[Long]): Unit = {
+  val reader = EventLogFileReader(fs, new Path(rootPath)).get
+  var logFiles = reader.listEventLogFiles
+
+  expectedIndexForCompact.foreach { idx =>
+val headFile = logFiles.head
+assert(EventLogFileWriter.isCompacted(headFile.getPath))
+assert(idx == 
RollingEventLogFilesWriter.getEventLogFileIndex(headFile.getPath.getName))
+logFiles = logFiles.drop(1)
+  }
+
+  assert(logFiles.size === expectedIndicesForNonCompact.size)
+
+  logFiles.foreach { logFile =>
+assert(RollingEventLogFilesWriter.isEventLogFile(logFile))
+assert(!EventLogFileWriter.isCompacted(logFile.getPath))
+  }
+
+  val indices = logFiles.map { logFile =>
+
RollingEventLogFilesWriter.getEventLogFileIndex(logFile.getPath.getName)
+  }
+  assert(expectedIndicesForNonCompact === indices)
+}
+
+withTempDir { dir =>
+  val conf = createTestConf()
+  conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
+  conf.set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 1)
+  conf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d)
+  val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+  val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)
+
+  val provider = new FsHistoryProvider(conf)
+
+  val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, 
conf, hadoopConf)
+  writer.start()
+
+  // writing event log file 1 - don't compact for now
+  writeEventsToRollingWriter(writer, Seq(
+SparkListenerApplicationStart("app", Some("app"), 0, "user", None),
+SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+
+  updateAndCheck(provider) { _ =>
+verifyEventLogFiles(fs, writer.logPath, None, Seq(1))
+val info = provider.listing.read(classOf[LogInfo], writer.logPath)
+assert(info.lastEvaluatedForCompaction === Some(1))
+  }
+
+  // writing event log file 2 - compact the event log file 1 into 1.compact
+  writeEventsToRollingWriter(writer, Seq.empty, rollFile = true)
+  writeEventsToRollingWriter(writer, Seq(SparkListenerUnpersistRDD(1),
+SparkListenerJobEnd(1, 1, JobSucceeded)), rollFile = false)
+
+  updateAndCheck(provider) { _ =>
+verifyEventLogFiles(fs, writer.logPath, Some(1), Seq(2))
+val info = provider.listing.read(classOf[LogInfo], writer.logPath)
+assert(info.lastEvaluatedForCompaction === Some(2))
+  }
+
+  // writing event log file 3 - compact two files - 1.compact & 2 into 
one, 2.compact
+  writeEventsToRollingWriter(writer, Seq.empty, rollFile = true)
+  writeEventsToRollingWriter(writer, Seq(
+SparkListenerExecutorAdded(3, "exec1", new ExecutorInfo("host1", 1, 
Map.empty)),
+SparkListenerJobStart(2, 4, Seq.empty),
+SparkListenerJobEnd(2, 5, JobSucceeded)), rollFile = false)
+
+  writer.stop()
+
+  updateAndCheck(provider) { _ =>
+verifyEventLogFiles(fs, writer.logPath, Some(2), Seq(3))
+
+val info = provider.listing.read(classOf[LogInfo], writer.logPath)
+assert(info.lastEvaluatedForCompaction === Some(3))
+
+val store = new InMemoryStore
+val appStore = new AppStatusStore(store)
+
+val reader = EventLogFileReader(fs, new Path(writer.logPath)).get
+provider.rebuildAppStore(store, reader, 0L)
+
+// replayed store doesn't have any job, as events for job are removed 
while compacting
+intercept[NoSuchElementException] {
+  appStore.job(1)
+}
+
+// but other events should be available even they were in original 
files to compact
+val appInfo = appStore.applicationInfo()
+assert(appInfo.id === "app")
+assert(appInfo.name === "app")
+
+// all events in retained file should be available, even they're 
related to finished jobs
 
 Review comment:
   all execs? (also, should be "all live execs"?)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server

2020-01-23 Thread GitBox
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r370234929
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -1175,6 +1237,21 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 }
 deleted
   }
+
+  /** NOTE: 'task' should ensure it executes 'endProcessing' at the end */
 
 Review comment:
   Hmm, perhaps this method could take care of calling `endProcessing` too 
(e.g. by wrapping the task)? Should be just a small adjustment at the call site.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server

2020-01-23 Thread GitBox
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r370235357
 
 

 ##
 File path: docs/configuration.md
 ##
 @@ -1023,6 +1023,26 @@ Apart from these, the following properties are also 
available, and may be useful
 The max size of event log file before it's rolled over.
   
 
+
+  spark.history.fs.eventLog.rolling.maxFilesToRetain
 
 Review comment:
   This actually should be with the other SHS configs in `monitoring.md`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server

2020-01-22 Thread GitBox
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r369678720
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -795,15 +800,42 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 // mean the end event is before the configured threshold, so call the 
method again to
 // re-parse the whole log.
 logInfo(s"Reparsing $logPath since end event was not found.")
-doMergeApplicationListing(reader, scanTime, enableOptimizations = 
false)
+doMergeApplicationListing(reader, scanTime, enableOptimizations = 
false,
+  lastEvaluatedForCompaction)
 
   case _ =>
 // If the app hasn't written down its app ID to the logs, still record 
the entry in the
 // listing db, with an empty ID. This will make the log eligible for 
deletion if the app
 // does not make progress after the configured max log age.
 listing.write(
   LogInfo(logPath.toString(), scanTime, LogType.EventLogs, None, None,
-reader.fileSizeForLastIndex, reader.lastIndex, reader.completed))
+reader.fileSizeForLastIndex, reader.lastIndex, 
lastEvaluatedForCompaction,
+reader.completed))
+}
+  }
+
+  private def compact(reader: EventLogFileReader): Unit = {
+val rootPath = reader.rootPath
+try {
+  reader.lastIndex match {
+case Some(lastIndex) =>
+  try {
+val info = listing.read(classOf[LogInfo], reader.rootPath.toString)
+if (info.lastEvaluatedForCompaction.isEmpty ||
+  info.lastEvaluatedForCompaction.get < lastIndex) {
+  // haven't tried compaction for this index, do compaction
+  fileCompactor.compact(reader.listEventLogFiles)
 
 Review comment:
   So one thing that feels a tiny bit odd is that when deciding whether to 
compact, you're actually considering the last log file, which you won't 
consider during actual compaction, right?
   
   Wouldn't that cause unnecessary (or too aggressive) compaction at the end of 
the application, when potentially a bunch of jobs finish and "release" lots of 
tasks, inflating the compation scoe?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server

2020-01-22 Thread GitBox
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r369671282
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -795,15 +800,42 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
 // mean the end event is before the configured threshold, so call the 
method again to
 // re-parse the whole log.
 logInfo(s"Reparsing $logPath since end event was not found.")
-doMergeApplicationListing(reader, scanTime, enableOptimizations = 
false)
+doMergeApplicationListing(reader, scanTime, enableOptimizations = 
false,
+  lastEvaluatedForCompaction)
 
   case _ =>
 // If the app hasn't written down its app ID to the logs, still record 
the entry in the
 // listing db, with an empty ID. This will make the log eligible for 
deletion if the app
 // does not make progress after the configured max log age.
 listing.write(
   LogInfo(logPath.toString(), scanTime, LogType.EventLogs, None, None,
-reader.fileSizeForLastIndex, reader.lastIndex, reader.completed))
+reader.fileSizeForLastIndex, reader.lastIndex, 
lastEvaluatedForCompaction,
+reader.completed))
+}
+  }
+
+  private def compact(reader: EventLogFileReader): Unit = {
+val rootPath = reader.rootPath
+try {
+  reader.lastIndex match {
+case Some(lastIndex) =>
+  try {
+val info = listing.read(classOf[LogInfo], reader.rootPath.toString)
+if (info.lastEvaluatedForCompaction.isEmpty ||
+  info.lastEvaluatedForCompaction.get < lastIndex) {
 
 Review comment:
   indent more


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server

2020-01-22 Thread GitBox
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r369677061
 
 

 ##
 File path: docs/configuration.md
 ##
 @@ -1023,6 +1023,26 @@ Apart from these, the following properties are also 
available, and may be useful
 The max size of event log file before it's rolled over.
   
 
+
+  spark.history.fs.eventLog.rolling.maxFilesToRetain
+  Int.MaxValue
+  
+The maximum number of event log files which will be retained as 
non-compacted. By default,
 
 Review comment:
   Spark currently doesn't have a "release notes" document in the docs dir, but 
I'm wondering if we should mention somewhere that this is a new feature and may 
not be completely stable (i.e. use with caution, it may delete more data than 
you expect, cause some UI issues we haven't thought about, etc). There are 
migration guides, but that doesn't seem like the right place...


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server

2020-01-22 Thread GitBox
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r369663751
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -720,6 +691,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 } finally {
   endProcessing(rootPath)
   pendingReplayTasksCount.decrementAndGet()
+
+  // triggering another task for compaction task
+  try {
+processing(rootPath)
+val task: Runnable = () => compact(reader)
+replayExecutor.submit(task)
+  } catch {
+// let the iteration over the updated entries break, since an 
exception on
+// replayExecutor.submit (..) indicates the ExecutorService is unable
+// to take any more submissions at this time
+case e: Exception =>
+  logError(s"Exception while submitting task for compaction", e)
+  endProcessing(rootPath)
 
 Review comment:
   >  if replayExecutor.submit fails 
   
   That should be extremely rare, but seems simple to handle.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server

2020-01-17 Thread GitBox
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r368090089
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ##
 @@ -1362,6 +1365,110 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with Matchers with Logging {
 }
   }
 
+  test("compact event log files") {
+def verifyEventLogFiles(
+fs: FileSystem,
+rootPath: String,
+expectedIndexForCompact: Option[Long],
+expectedIndicesForNonCompact: Seq[Long]): Unit = {
+  val reader = EventLogFileReader(fs, new Path(rootPath)).get
+  var logFiles = reader.listEventLogFiles
+
+  expectedIndexForCompact.foreach { idx =>
+val headFile = logFiles.head
+assert(EventLogFileWriter.isCompacted(headFile.getPath))
+assert(idx == 
RollingEventLogFilesWriter.getEventLogFileIndex(headFile.getPath.getName))
+logFiles = logFiles.drop(1)
+  }
+
+  assert(logFiles.size === expectedIndicesForNonCompact.size)
+
+  logFiles.foreach { logFile =>
+assert(RollingEventLogFilesWriter.isEventLogFile(logFile))
+assert(!EventLogFileWriter.isCompacted(logFile.getPath))
+  }
+
+  val indices = logFiles.map { logFile =>
+
RollingEventLogFilesWriter.getEventLogFileIndex(logFile.getPath.getName)
+  }
+  assert(expectedIndicesForNonCompact === indices)
+}
+
+withTempDir { dir =>
+  val conf = createTestConf()
+  conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
+  conf.set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 1)
+  conf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d)
+  val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+  val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)
+
+  val provider = new FsHistoryProvider(conf)
+
+  val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, 
conf, hadoopConf)
+  writer.start()
+
+  // writing event log file 1 - don't compact for now
+  writeEventsToRollingWriter(writer, Seq(
+SparkListenerApplicationStart("app", Some("app"), 0, "user", None),
+SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+
+  updateAndCheck(provider) { _ =>
+verifyEventLogFiles(fs, writer.logPath, None, Seq(1))
+val info = provider.listing.read(classOf[LogInfo], writer.logPath)
+assert(info.lastIndexToRunCompaction === Some(1))
+  }
+
+  // writing event log file 2 - compact the event log file 1 into 1.compact
+  writeEventsToRollingWriter(writer, Seq.empty, rollFile = true)
+  writeEventsToRollingWriter(writer, Seq(SparkListenerUnpersistRDD(1),
+SparkListenerJobEnd(1, 1, JobSucceeded)), rollFile = false)
+
+  updateAndCheck(provider) { _ =>
+verifyEventLogFiles(fs, writer.logPath, Some(1), Seq(2))
+val info = provider.listing.read(classOf[LogInfo], writer.logPath)
+assert(info.lastIndexToRunCompaction === Some(2))
 
 Review comment:
   If my understanding is correct then perhaps a better name is 
`lastEvaluatedForCompaction` (didn't add "index" because the name is already 
pretty long as it is).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server

2020-01-17 Thread GitBox
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r36807
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ##
 @@ -1362,6 +1365,110 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with Matchers with Logging {
 }
   }
 
+  test("compact event log files") {
+def verifyEventLogFiles(
+fs: FileSystem,
+rootPath: String,
+expectedIndexForCompact: Option[Long],
+expectedIndicesForNonCompact: Seq[Long]): Unit = {
+  val reader = EventLogFileReader(fs, new Path(rootPath)).get
+  var logFiles = reader.listEventLogFiles
+
+  expectedIndexForCompact.foreach { idx =>
+val headFile = logFiles.head
+assert(EventLogFileWriter.isCompacted(headFile.getPath))
+assert(idx == 
RollingEventLogFilesWriter.getEventLogFileIndex(headFile.getPath.getName))
+logFiles = logFiles.drop(1)
+  }
+
+  assert(logFiles.size === expectedIndicesForNonCompact.size)
+
+  logFiles.foreach { logFile =>
+assert(RollingEventLogFilesWriter.isEventLogFile(logFile))
+assert(!EventLogFileWriter.isCompacted(logFile.getPath))
+  }
+
+  val indices = logFiles.map { logFile =>
+
RollingEventLogFilesWriter.getEventLogFileIndex(logFile.getPath.getName)
+  }
+  assert(expectedIndicesForNonCompact === indices)
+}
+
+withTempDir { dir =>
+  val conf = createTestConf()
+  conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
+  conf.set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 1)
+  conf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d)
+  val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+  val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)
+
+  val provider = new FsHistoryProvider(conf)
+
+  val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, 
conf, hadoopConf)
+  writer.start()
+
+  // writing event log file 1 - don't compact for now
+  writeEventsToRollingWriter(writer, Seq(
+SparkListenerApplicationStart("app", Some("app"), 0, "user", None),
+SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+
+  updateAndCheck(provider) { _ =>
+verifyEventLogFiles(fs, writer.logPath, None, Seq(1))
+val info = provider.listing.read(classOf[LogInfo], writer.logPath)
+assert(info.lastIndexToRunCompaction === Some(1))
+  }
+
+  // writing event log file 2 - compact the event log file 1 into 1.compact
+  writeEventsToRollingWriter(writer, Seq.empty, rollFile = true)
+  writeEventsToRollingWriter(writer, Seq(SparkListenerUnpersistRDD(1),
+SparkListenerJobEnd(1, 1, JobSucceeded)), rollFile = false)
+
+  updateAndCheck(provider) { _ =>
+verifyEventLogFiles(fs, writer.logPath, Some(1), Seq(2))
+val info = provider.listing.read(classOf[LogInfo], writer.logPath)
+assert(info.lastIndexToRunCompaction === Some(2))
 
 Review comment:
   From that comment it seems that what you're saying is that, in this case, 
the log with index "2" isn't yet compacted, but it's been processed and found 
that it's not yet worth it to compact it. Is that right?
   
   I think the variable name is confusing because it mixes "last" (which to me 
evokes something that has happened already) and "to run" (which evokes a future 
action). So I can't really parse the meaning of the name since it seems to 
contradict itself.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server

2020-01-17 Thread GitBox
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r368086444
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##
 @@ -197,8 +197,6 @@ package object config {
 
   private[spark] val EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN =
 ConfigBuilder("spark.eventLog.rolling.maxFilesToRetain")
 
 Review comment:
   Makes sense to keep the score threshold internal. We can make it public 
later if there's a good reason for it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server

2020-01-17 Thread GitBox
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r368086161
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -661,26 +691,33 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   reader: EventLogFileReader,
   scanTime: Long,
   enableOptimizations: Boolean): Unit = {
+val rootPath = reader.rootPath
 try {
+  val (shouldReload, lastCompactionIndex) = compact(reader)
 
 Review comment:
   I was thinking that instead of doing this inline here, you could just submit 
a task to `replayExecutor` after the listing data is updated, so that you give 
other tasks fetching listing data a chance to run before you try compaction. 
(You'd call `endProcessing()` at the end of that separate task.)
   
   Any reason why that would not work?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server

2020-01-15 Thread GitBox
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r367137498
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -569,6 +570,35 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 }
   }
 
+  /**
+   * Returns a tuple containing two values. Each element means:
+   * - 1st (Boolean): true if the list of event log files are changed, false 
otherwise.
+   * - 2nd (Option[Long]): Some(value) if the method succeeds to try 
compaction,
+   *   value indicates the last event log index to try compaction. None 
otherwise.
+   */
+  private def compact(reader: EventLogFileReader): (Boolean, Option[Long]) = {
+reader.lastIndex match {
+  case Some(lastIndex) =>
+try {
+  val info = listing.read(classOf[LogInfo], reader.rootPath.toString)
+  if (info.lastIndexToRunCompaction.isEmpty ||
+info.lastIndexToRunCompaction.get < lastIndex) {
 
 Review comment:
   indent more


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server

2020-01-15 Thread GitBox
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r367137936
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -569,6 +570,35 @@ private[history] class FsHistoryProvider(conf: SparkConf, 
clock: Clock)
 }
   }
 
+  /**
+   * Returns a tuple containing two values. Each element means:
+   * - 1st (Boolean): true if the list of event log files are changed, false 
otherwise.
+   * - 2nd (Option[Long]): Some(value) if the method succeeds to try 
compaction,
+   *   value indicates the last event log index to try compaction. None 
otherwise.
 
 Review comment:
   what does it mean to "try compaction"? Does it mean that when this method 
returns, no compaction was actually done, it was just tried?
   
   The tuple being returned sounds a bit confusing. instead, why not return 
just an `Option[Long]` telling you both whether compaction ran, and what's the 
index of the first non-compacted file (or last compacted file, not sure what's 
being tracked really)?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server

2020-01-15 Thread GitBox
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r367142717
 
 

 ##
 File path: 
core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala
 ##
 @@ -661,26 +691,33 @@ private[history] class FsHistoryProvider(conf: 
SparkConf, clock: Clock)
   reader: EventLogFileReader,
   scanTime: Long,
   enableOptimizations: Boolean): Unit = {
+val rootPath = reader.rootPath
 try {
+  val (shouldReload, lastCompactionIndex) = compact(reader)
 
 Review comment:
   Is there anyway to do compaction as a separate task? Otherwise it seems like 
this could slow down the creation of the app listing, especially if you're not 
using disk caching or it's the first time the SHS is creating the listing db.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server

2020-01-15 Thread GitBox
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r367146348
 
 

 ##
 File path: core/src/main/scala/org/apache/spark/internal/config/package.scala
 ##
 @@ -197,8 +197,6 @@ package object config {
 
   private[spark] val EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN =
 ConfigBuilder("spark.eventLog.rolling.maxFilesToRetain")
 
 Review comment:
   Hmm... this is kind of a history server config, right? Or even more 
specifically, a `FsHistoryProvider` config.
   
   Might be better to have it grouped with other configs in `History.scala`, 
and make it a constructor argument in `EventLogFileCompactor`.
   
   (In fact the same applies to `EVENT_LOG_COMPACTION_SCORE_THRESHOLD`. Should 
that be not `internal()` anymore, also?)


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server

2020-01-15 Thread GitBox
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r367148994
 
 

 ##
 File path: 
core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
 ##
 @@ -1362,6 +1365,110 @@ class FsHistoryProviderSuite extends SparkFunSuite 
with Matchers with Logging {
 }
   }
 
+  test("compact event log files") {
+def verifyEventLogFiles(
+fs: FileSystem,
+rootPath: String,
+expectedIndexForCompact: Option[Long],
+expectedIndicesForNonCompact: Seq[Long]): Unit = {
+  val reader = EventLogFileReader(fs, new Path(rootPath)).get
+  var logFiles = reader.listEventLogFiles
+
+  expectedIndexForCompact.foreach { idx =>
+val headFile = logFiles.head
+assert(EventLogFileWriter.isCompacted(headFile.getPath))
+assert(idx == 
RollingEventLogFilesWriter.getEventLogFileIndex(headFile.getPath.getName))
+logFiles = logFiles.drop(1)
+  }
+
+  assert(logFiles.size === expectedIndicesForNonCompact.size)
+
+  logFiles.foreach { logFile =>
+assert(RollingEventLogFilesWriter.isEventLogFile(logFile))
+assert(!EventLogFileWriter.isCompacted(logFile.getPath))
+  }
+
+  val indices = logFiles.map { logFile =>
+
RollingEventLogFilesWriter.getEventLogFileIndex(logFile.getPath.getName)
+  }
+  assert(expectedIndicesForNonCompact === indices)
+}
+
+withTempDir { dir =>
+  val conf = createTestConf()
+  conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
+  conf.set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 1)
+  conf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d)
+  val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
+  val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf)
+
+  val provider = new FsHistoryProvider(conf)
+
+  val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, 
conf, hadoopConf)
+  writer.start()
+
+  // writing event log file 1 - don't compact for now
+  writeEventsToRollingWriter(writer, Seq(
+SparkListenerApplicationStart("app", Some("app"), 0, "user", None),
+SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
+
+  updateAndCheck(provider) { _ =>
+verifyEventLogFiles(fs, writer.logPath, None, Seq(1))
+val info = provider.listing.read(classOf[LogInfo], writer.logPath)
+assert(info.lastIndexToRunCompaction === Some(1))
+  }
+
+  // writing event log file 2 - compact the event log file 1 into 1.compact
+  writeEventsToRollingWriter(writer, Seq.empty, rollFile = true)
+  writeEventsToRollingWriter(writer, Seq(SparkListenerUnpersistRDD(1),
+SparkListenerJobEnd(1, 1, JobSucceeded)), rollFile = false)
+
+  updateAndCheck(provider) { _ =>
+verifyEventLogFiles(fs, writer.logPath, Some(1), Seq(2))
+val info = provider.listing.read(classOf[LogInfo], writer.logPath)
+assert(info.lastIndexToRunCompaction === Some(2))
 
 Review comment:
   I'm a little confused by this check. `2` was not compacted, right? Only `1` 
was. So why is `lastIndexToRunCompaction` 2?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server

2020-01-15 Thread GitBox
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] 
Integrate event log compactor into Spark History Server
URL: https://github.com/apache/spark/pull/27208#discussion_r367149532
 
 

 ##
 File path: docs/configuration.md
 ##
 @@ -1023,6 +1023,24 @@ Apart from these, the following properties are also 
available, and may be useful
 The max size of event log file before it's rolled over.
   
 
+
+  spark.eventLog.rolling.maxFilesToRetain
+  Int.MaxValue
+  
+The maximum number of event log files which will be retained as 
non-compacted.
+By default, all event log files will be retained. Please set the 
configuration and
 
 Review comment:
   Might be good to clarify where to configure things, since this config is for 
the history server, but `spark.eventLog.rolling.maxFileSize` needs to be set in 
the applications.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org