[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server URL: https://github.com/apache/spark/pull/27208#discussion_r371572754 ## File path: core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala ## @@ -1362,6 +1365,110 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { } } + test("compact event log files") { +def verifyEventLogFiles( +fs: FileSystem, +rootPath: String, +expectedIndexForCompact: Option[Long], +expectedIndicesForNonCompact: Seq[Long]): Unit = { + val reader = EventLogFileReader(fs, new Path(rootPath)).get + var logFiles = reader.listEventLogFiles + + expectedIndexForCompact.foreach { idx => +val headFile = logFiles.head +assert(EventLogFileWriter.isCompacted(headFile.getPath)) +assert(idx == RollingEventLogFilesWriter.getEventLogFileIndex(headFile.getPath.getName)) +logFiles = logFiles.drop(1) + } + + assert(logFiles.size === expectedIndicesForNonCompact.size) + + logFiles.foreach { logFile => +assert(RollingEventLogFilesWriter.isEventLogFile(logFile)) +assert(!EventLogFileWriter.isCompacted(logFile.getPath)) + } + + val indices = logFiles.map { logFile => + RollingEventLogFilesWriter.getEventLogFileIndex(logFile.getPath.getName) + } + assert(expectedIndicesForNonCompact === indices) +} + +withTempDir { dir => + val conf = createTestConf() + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + conf.set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 1) + conf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val provider = new FsHistoryProvider(conf) + + val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, conf, hadoopConf) + writer.start() + + // writing event log file 1 - don't compact for now + writeEventsToRollingWriter(writer, Seq( +SparkListenerApplicationStart("app", Some("app"), 0, "user", None), +SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + + updateAndCheck(provider) { _ => +verifyEventLogFiles(fs, writer.logPath, None, Seq(1)) +val info = provider.listing.read(classOf[LogInfo], writer.logPath) +assert(info.lastEvaluatedForCompaction === Some(1)) + } + + // writing event log file 2 - compact the event log file 1 into 1.compact + writeEventsToRollingWriter(writer, Seq.empty, rollFile = true) + writeEventsToRollingWriter(writer, Seq(SparkListenerUnpersistRDD(1), +SparkListenerJobEnd(1, 1, JobSucceeded)), rollFile = false) + + updateAndCheck(provider) { _ => +verifyEventLogFiles(fs, writer.logPath, Some(1), Seq(2)) +val info = provider.listing.read(classOf[LogInfo], writer.logPath) +assert(info.lastEvaluatedForCompaction === Some(2)) + } + + // writing event log file 3 - compact two files - 1.compact & 2 into one, 2.compact + writeEventsToRollingWriter(writer, Seq.empty, rollFile = true) + writeEventsToRollingWriter(writer, Seq( +SparkListenerExecutorAdded(3, "exec1", new ExecutorInfo("host1", 1, Map.empty)), +SparkListenerJobStart(2, 4, Seq.empty), +SparkListenerJobEnd(2, 5, JobSucceeded)), rollFile = false) + + writer.stop() + + updateAndCheck(provider) { _ => +verifyEventLogFiles(fs, writer.logPath, Some(2), Seq(3)) + +val info = provider.listing.read(classOf[LogInfo], writer.logPath) +assert(info.lastEvaluatedForCompaction === Some(3)) + +val store = new InMemoryStore +val appStore = new AppStatusStore(store) + +val reader = EventLogFileReader(fs, new Path(writer.logPath)).get +provider.rebuildAppStore(store, reader, 0L) + +// replayed store doesn't have any job, as events for job are removed while compacting +intercept[NoSuchElementException] { + appStore.job(1) +} + +// but other events should be available even they were in original files to compact +val appInfo = appStore.applicationInfo() +assert(appInfo.id === "app") +assert(appInfo.name === "app") + +// all events in retained file should be available, even they're related to finished jobs Review comment: all execs? (also, should be "all live execs"?) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server URL: https://github.com/apache/spark/pull/27208#discussion_r370234929 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -1175,6 +1237,21 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } deleted } + + /** NOTE: 'task' should ensure it executes 'endProcessing' at the end */ Review comment: Hmm, perhaps this method could take care of calling `endProcessing` too (e.g. by wrapping the task)? Should be just a small adjustment at the call site. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server URL: https://github.com/apache/spark/pull/27208#discussion_r370235357 ## File path: docs/configuration.md ## @@ -1023,6 +1023,26 @@ Apart from these, the following properties are also available, and may be useful The max size of event log file before it's rolled over. + + spark.history.fs.eventLog.rolling.maxFilesToRetain Review comment: This actually should be with the other SHS configs in `monitoring.md`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server URL: https://github.com/apache/spark/pull/27208#discussion_r369678720 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -795,15 +800,42 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // mean the end event is before the configured threshold, so call the method again to // re-parse the whole log. logInfo(s"Reparsing $logPath since end event was not found.") -doMergeApplicationListing(reader, scanTime, enableOptimizations = false) +doMergeApplicationListing(reader, scanTime, enableOptimizations = false, + lastEvaluatedForCompaction) case _ => // If the app hasn't written down its app ID to the logs, still record the entry in the // listing db, with an empty ID. This will make the log eligible for deletion if the app // does not make progress after the configured max log age. listing.write( LogInfo(logPath.toString(), scanTime, LogType.EventLogs, None, None, -reader.fileSizeForLastIndex, reader.lastIndex, reader.completed)) +reader.fileSizeForLastIndex, reader.lastIndex, lastEvaluatedForCompaction, +reader.completed)) +} + } + + private def compact(reader: EventLogFileReader): Unit = { +val rootPath = reader.rootPath +try { + reader.lastIndex match { +case Some(lastIndex) => + try { +val info = listing.read(classOf[LogInfo], reader.rootPath.toString) +if (info.lastEvaluatedForCompaction.isEmpty || + info.lastEvaluatedForCompaction.get < lastIndex) { + // haven't tried compaction for this index, do compaction + fileCompactor.compact(reader.listEventLogFiles) Review comment: So one thing that feels a tiny bit odd is that when deciding whether to compact, you're actually considering the last log file, which you won't consider during actual compaction, right? Wouldn't that cause unnecessary (or too aggressive) compaction at the end of the application, when potentially a bunch of jobs finish and "release" lots of tasks, inflating the compation scoe? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server URL: https://github.com/apache/spark/pull/27208#discussion_r369671282 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -795,15 +800,42 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // mean the end event is before the configured threshold, so call the method again to // re-parse the whole log. logInfo(s"Reparsing $logPath since end event was not found.") -doMergeApplicationListing(reader, scanTime, enableOptimizations = false) +doMergeApplicationListing(reader, scanTime, enableOptimizations = false, + lastEvaluatedForCompaction) case _ => // If the app hasn't written down its app ID to the logs, still record the entry in the // listing db, with an empty ID. This will make the log eligible for deletion if the app // does not make progress after the configured max log age. listing.write( LogInfo(logPath.toString(), scanTime, LogType.EventLogs, None, None, -reader.fileSizeForLastIndex, reader.lastIndex, reader.completed)) +reader.fileSizeForLastIndex, reader.lastIndex, lastEvaluatedForCompaction, +reader.completed)) +} + } + + private def compact(reader: EventLogFileReader): Unit = { +val rootPath = reader.rootPath +try { + reader.lastIndex match { +case Some(lastIndex) => + try { +val info = listing.read(classOf[LogInfo], reader.rootPath.toString) +if (info.lastEvaluatedForCompaction.isEmpty || + info.lastEvaluatedForCompaction.get < lastIndex) { Review comment: indent more This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server URL: https://github.com/apache/spark/pull/27208#discussion_r369677061 ## File path: docs/configuration.md ## @@ -1023,6 +1023,26 @@ Apart from these, the following properties are also available, and may be useful The max size of event log file before it's rolled over. + + spark.history.fs.eventLog.rolling.maxFilesToRetain + Int.MaxValue + +The maximum number of event log files which will be retained as non-compacted. By default, Review comment: Spark currently doesn't have a "release notes" document in the docs dir, but I'm wondering if we should mention somewhere that this is a new feature and may not be completely stable (i.e. use with caution, it may delete more data than you expect, cause some UI issues we haven't thought about, etc). There are migration guides, but that doesn't seem like the right place... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server URL: https://github.com/apache/spark/pull/27208#discussion_r369663751 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -720,6 +691,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } finally { endProcessing(rootPath) pendingReplayTasksCount.decrementAndGet() + + // triggering another task for compaction task + try { +processing(rootPath) +val task: Runnable = () => compact(reader) +replayExecutor.submit(task) + } catch { +// let the iteration over the updated entries break, since an exception on +// replayExecutor.submit (..) indicates the ExecutorService is unable +// to take any more submissions at this time +case e: Exception => + logError(s"Exception while submitting task for compaction", e) + endProcessing(rootPath) Review comment: > if replayExecutor.submit fails That should be extremely rare, but seems simple to handle. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server URL: https://github.com/apache/spark/pull/27208#discussion_r368090089 ## File path: core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala ## @@ -1362,6 +1365,110 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { } } + test("compact event log files") { +def verifyEventLogFiles( +fs: FileSystem, +rootPath: String, +expectedIndexForCompact: Option[Long], +expectedIndicesForNonCompact: Seq[Long]): Unit = { + val reader = EventLogFileReader(fs, new Path(rootPath)).get + var logFiles = reader.listEventLogFiles + + expectedIndexForCompact.foreach { idx => +val headFile = logFiles.head +assert(EventLogFileWriter.isCompacted(headFile.getPath)) +assert(idx == RollingEventLogFilesWriter.getEventLogFileIndex(headFile.getPath.getName)) +logFiles = logFiles.drop(1) + } + + assert(logFiles.size === expectedIndicesForNonCompact.size) + + logFiles.foreach { logFile => +assert(RollingEventLogFilesWriter.isEventLogFile(logFile)) +assert(!EventLogFileWriter.isCompacted(logFile.getPath)) + } + + val indices = logFiles.map { logFile => + RollingEventLogFilesWriter.getEventLogFileIndex(logFile.getPath.getName) + } + assert(expectedIndicesForNonCompact === indices) +} + +withTempDir { dir => + val conf = createTestConf() + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + conf.set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 1) + conf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val provider = new FsHistoryProvider(conf) + + val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, conf, hadoopConf) + writer.start() + + // writing event log file 1 - don't compact for now + writeEventsToRollingWriter(writer, Seq( +SparkListenerApplicationStart("app", Some("app"), 0, "user", None), +SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + + updateAndCheck(provider) { _ => +verifyEventLogFiles(fs, writer.logPath, None, Seq(1)) +val info = provider.listing.read(classOf[LogInfo], writer.logPath) +assert(info.lastIndexToRunCompaction === Some(1)) + } + + // writing event log file 2 - compact the event log file 1 into 1.compact + writeEventsToRollingWriter(writer, Seq.empty, rollFile = true) + writeEventsToRollingWriter(writer, Seq(SparkListenerUnpersistRDD(1), +SparkListenerJobEnd(1, 1, JobSucceeded)), rollFile = false) + + updateAndCheck(provider) { _ => +verifyEventLogFiles(fs, writer.logPath, Some(1), Seq(2)) +val info = provider.listing.read(classOf[LogInfo], writer.logPath) +assert(info.lastIndexToRunCompaction === Some(2)) Review comment: If my understanding is correct then perhaps a better name is `lastEvaluatedForCompaction` (didn't add "index" because the name is already pretty long as it is). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server URL: https://github.com/apache/spark/pull/27208#discussion_r36807 ## File path: core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala ## @@ -1362,6 +1365,110 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { } } + test("compact event log files") { +def verifyEventLogFiles( +fs: FileSystem, +rootPath: String, +expectedIndexForCompact: Option[Long], +expectedIndicesForNonCompact: Seq[Long]): Unit = { + val reader = EventLogFileReader(fs, new Path(rootPath)).get + var logFiles = reader.listEventLogFiles + + expectedIndexForCompact.foreach { idx => +val headFile = logFiles.head +assert(EventLogFileWriter.isCompacted(headFile.getPath)) +assert(idx == RollingEventLogFilesWriter.getEventLogFileIndex(headFile.getPath.getName)) +logFiles = logFiles.drop(1) + } + + assert(logFiles.size === expectedIndicesForNonCompact.size) + + logFiles.foreach { logFile => +assert(RollingEventLogFilesWriter.isEventLogFile(logFile)) +assert(!EventLogFileWriter.isCompacted(logFile.getPath)) + } + + val indices = logFiles.map { logFile => + RollingEventLogFilesWriter.getEventLogFileIndex(logFile.getPath.getName) + } + assert(expectedIndicesForNonCompact === indices) +} + +withTempDir { dir => + val conf = createTestConf() + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + conf.set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 1) + conf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val provider = new FsHistoryProvider(conf) + + val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, conf, hadoopConf) + writer.start() + + // writing event log file 1 - don't compact for now + writeEventsToRollingWriter(writer, Seq( +SparkListenerApplicationStart("app", Some("app"), 0, "user", None), +SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + + updateAndCheck(provider) { _ => +verifyEventLogFiles(fs, writer.logPath, None, Seq(1)) +val info = provider.listing.read(classOf[LogInfo], writer.logPath) +assert(info.lastIndexToRunCompaction === Some(1)) + } + + // writing event log file 2 - compact the event log file 1 into 1.compact + writeEventsToRollingWriter(writer, Seq.empty, rollFile = true) + writeEventsToRollingWriter(writer, Seq(SparkListenerUnpersistRDD(1), +SparkListenerJobEnd(1, 1, JobSucceeded)), rollFile = false) + + updateAndCheck(provider) { _ => +verifyEventLogFiles(fs, writer.logPath, Some(1), Seq(2)) +val info = provider.listing.read(classOf[LogInfo], writer.logPath) +assert(info.lastIndexToRunCompaction === Some(2)) Review comment: From that comment it seems that what you're saying is that, in this case, the log with index "2" isn't yet compacted, but it's been processed and found that it's not yet worth it to compact it. Is that right? I think the variable name is confusing because it mixes "last" (which to me evokes something that has happened already) and "to run" (which evokes a future action). So I can't really parse the meaning of the name since it seems to contradict itself. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server URL: https://github.com/apache/spark/pull/27208#discussion_r368086444 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -197,8 +197,6 @@ package object config { private[spark] val EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN = ConfigBuilder("spark.eventLog.rolling.maxFilesToRetain") Review comment: Makes sense to keep the score threshold internal. We can make it public later if there's a good reason for it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server URL: https://github.com/apache/spark/pull/27208#discussion_r368086161 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -661,26 +691,33 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) reader: EventLogFileReader, scanTime: Long, enableOptimizations: Boolean): Unit = { +val rootPath = reader.rootPath try { + val (shouldReload, lastCompactionIndex) = compact(reader) Review comment: I was thinking that instead of doing this inline here, you could just submit a task to `replayExecutor` after the listing data is updated, so that you give other tasks fetching listing data a chance to run before you try compaction. (You'd call `endProcessing()` at the end of that separate task.) Any reason why that would not work? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server URL: https://github.com/apache/spark/pull/27208#discussion_r367137498 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -569,6 +570,35 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + /** + * Returns a tuple containing two values. Each element means: + * - 1st (Boolean): true if the list of event log files are changed, false otherwise. + * - 2nd (Option[Long]): Some(value) if the method succeeds to try compaction, + * value indicates the last event log index to try compaction. None otherwise. + */ + private def compact(reader: EventLogFileReader): (Boolean, Option[Long]) = { +reader.lastIndex match { + case Some(lastIndex) => +try { + val info = listing.read(classOf[LogInfo], reader.rootPath.toString) + if (info.lastIndexToRunCompaction.isEmpty || +info.lastIndexToRunCompaction.get < lastIndex) { Review comment: indent more This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server URL: https://github.com/apache/spark/pull/27208#discussion_r367137936 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -569,6 +570,35 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } + /** + * Returns a tuple containing two values. Each element means: + * - 1st (Boolean): true if the list of event log files are changed, false otherwise. + * - 2nd (Option[Long]): Some(value) if the method succeeds to try compaction, + * value indicates the last event log index to try compaction. None otherwise. Review comment: what does it mean to "try compaction"? Does it mean that when this method returns, no compaction was actually done, it was just tried? The tuple being returned sounds a bit confusing. instead, why not return just an `Option[Long]` telling you both whether compaction ran, and what's the index of the first non-compacted file (or last compacted file, not sure what's being tracked really)? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server URL: https://github.com/apache/spark/pull/27208#discussion_r367142717 ## File path: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ## @@ -661,26 +691,33 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) reader: EventLogFileReader, scanTime: Long, enableOptimizations: Boolean): Unit = { +val rootPath = reader.rootPath try { + val (shouldReload, lastCompactionIndex) = compact(reader) Review comment: Is there anyway to do compaction as a separate task? Otherwise it seems like this could slow down the creation of the app listing, especially if you're not using disk caching or it's the first time the SHS is creating the listing db. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server URL: https://github.com/apache/spark/pull/27208#discussion_r367146348 ## File path: core/src/main/scala/org/apache/spark/internal/config/package.scala ## @@ -197,8 +197,6 @@ package object config { private[spark] val EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN = ConfigBuilder("spark.eventLog.rolling.maxFilesToRetain") Review comment: Hmm... this is kind of a history server config, right? Or even more specifically, a `FsHistoryProvider` config. Might be better to have it grouped with other configs in `History.scala`, and make it a constructor argument in `EventLogFileCompactor`. (In fact the same applies to `EVENT_LOG_COMPACTION_SCORE_THRESHOLD`. Should that be not `internal()` anymore, also?) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server URL: https://github.com/apache/spark/pull/27208#discussion_r367148994 ## File path: core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala ## @@ -1362,6 +1365,110 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { } } + test("compact event log files") { +def verifyEventLogFiles( +fs: FileSystem, +rootPath: String, +expectedIndexForCompact: Option[Long], +expectedIndicesForNonCompact: Seq[Long]): Unit = { + val reader = EventLogFileReader(fs, new Path(rootPath)).get + var logFiles = reader.listEventLogFiles + + expectedIndexForCompact.foreach { idx => +val headFile = logFiles.head +assert(EventLogFileWriter.isCompacted(headFile.getPath)) +assert(idx == RollingEventLogFilesWriter.getEventLogFileIndex(headFile.getPath.getName)) +logFiles = logFiles.drop(1) + } + + assert(logFiles.size === expectedIndicesForNonCompact.size) + + logFiles.foreach { logFile => +assert(RollingEventLogFilesWriter.isEventLogFile(logFile)) +assert(!EventLogFileWriter.isCompacted(logFile.getPath)) + } + + val indices = logFiles.map { logFile => + RollingEventLogFilesWriter.getEventLogFileIndex(logFile.getPath.getName) + } + assert(expectedIndicesForNonCompact === indices) +} + +withTempDir { dir => + val conf = createTestConf() + conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath) + conf.set(EVENT_LOG_ROLLING_MAX_FILES_TO_RETAIN, 1) + conf.set(EVENT_LOG_COMPACTION_SCORE_THRESHOLD, 0.0d) + val hadoopConf = SparkHadoopUtil.newConfiguration(conf) + val fs = new Path(dir.getAbsolutePath).getFileSystem(hadoopConf) + + val provider = new FsHistoryProvider(conf) + + val writer = new RollingEventLogFilesWriter("app", None, dir.toURI, conf, hadoopConf) + writer.start() + + // writing event log file 1 - don't compact for now + writeEventsToRollingWriter(writer, Seq( +SparkListenerApplicationStart("app", Some("app"), 0, "user", None), +SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false) + + updateAndCheck(provider) { _ => +verifyEventLogFiles(fs, writer.logPath, None, Seq(1)) +val info = provider.listing.read(classOf[LogInfo], writer.logPath) +assert(info.lastIndexToRunCompaction === Some(1)) + } + + // writing event log file 2 - compact the event log file 1 into 1.compact + writeEventsToRollingWriter(writer, Seq.empty, rollFile = true) + writeEventsToRollingWriter(writer, Seq(SparkListenerUnpersistRDD(1), +SparkListenerJobEnd(1, 1, JobSucceeded)), rollFile = false) + + updateAndCheck(provider) { _ => +verifyEventLogFiles(fs, writer.logPath, Some(1), Seq(2)) +val info = provider.listing.read(classOf[LogInfo], writer.logPath) +assert(info.lastIndexToRunCompaction === Some(2)) Review comment: I'm a little confused by this check. `2` was not compacted, right? Only `1` was. So why is `lastIndexToRunCompaction` 2? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server
vanzin commented on a change in pull request #27208: [SPARK-30481][CORE] Integrate event log compactor into Spark History Server URL: https://github.com/apache/spark/pull/27208#discussion_r367149532 ## File path: docs/configuration.md ## @@ -1023,6 +1023,24 @@ Apart from these, the following properties are also available, and may be useful The max size of event log file before it's rolled over. + + spark.eventLog.rolling.maxFilesToRetain + Int.MaxValue + +The maximum number of event log files which will be retained as non-compacted. +By default, all event log files will be retained. Please set the configuration and Review comment: Might be good to clarify where to configure things, since this config is for the history server, but `spark.eventLog.rolling.maxFileSize` needs to be set in the applications. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org