[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25464696 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -44,8 +48,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val NOT_STARTED = Not Started // Interval between each check for event log updates - private val UPDATE_INTERVAL_MS = conf.getInt(spark.history.fs.updateInterval, -conf.getInt(spark.history.updateInterval, 10)) * 1000 + private val UPDATE_INTERVAL_MS = conf.getOption(spark.history.fs.update.interval.seconds) + .orElse(conf.getOption(SparkConf.translateConfKey(spark.history.fs.updateInterval, true))) --- End diff -- we shouldn't to use `translateConfKey` here, because `conf.getOption` already does it for us. We should really make `translateConfKey` private, but we can do that in a separate patch. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25465389 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -44,8 +48,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val NOT_STARTED = Not Started // Interval between each check for event log updates - private val UPDATE_INTERVAL_MS = conf.getInt(spark.history.fs.updateInterval, -conf.getInt(spark.history.updateInterval, 10)) * 1000 + private val UPDATE_INTERVAL_MS = conf.getOption(spark.history.fs.update.interval.seconds) + .orElse(conf.getOption(SparkConf.translateConfKey(spark.history.fs.updateInterval, true))) + .orElse(conf.getOption(SparkConf.translateConfKey(spark.history.updateInterval, true))) --- End diff -- We shouldn't call `translateConfKey` here, but I realize we need to do so if we want to warn the user. I will submit a separate patch to fix this behavior. In general I think the `translateConfKey` method should be private to `SparkConf`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/4214 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/4214#issuecomment-76276422 LGTM I'm merging this into master and I will file a new issue for the conf key issue I mentioned. Thanks for your patience @viper-kun! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user viper-kun commented on the pull request: https://github.com/apache/spark/pull/4214#issuecomment-76109919 @andrewor14 thanks for your check. Pls retest it . I can not get test log. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4214#issuecomment-76110309 [Test build #27974 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27974/consoleFull) for PR 4214 at commit [`31674ee`](https://github.com/apache/spark/commit/31674ee308689c29da96c0325adb2774bf72c353). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4214#issuecomment-76115296 [Test build #27974 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27974/consoleFull) for PR 4214 at commit [`31674ee`](https://github.com/apache/spark/commit/31674ee308689c29da96c0325adb2774bf72c353). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4214#issuecomment-76115303 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27974/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4214#issuecomment-76128088 [Test build #27988 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27988/consoleFull) for PR 4214 at commit [`7a5b9c5`](https://github.com/apache/spark/commit/7a5b9c5c5337bd6a445b5a6551bb5bff6201f066). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4214#issuecomment-76134530 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27988/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4214#issuecomment-76134522 [Test build #27988 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27988/consoleFull) for PR 4214 at commit [`7a5b9c5`](https://github.com/apache/spark/commit/7a5b9c5c5337bd6a445b5a6551bb5bff6201f066). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4214#issuecomment-75933886 [Test build #27947 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27947/consoleFull) for PR 4214 at commit [`6e3d06b`](https://github.com/apache/spark/commit/6e3d06b0f282585f45eb72fa3b03a09ffe343729). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4214#issuecomment-75933991 [Test build #27947 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27947/consoleFull) for PR 4214 at commit [`6e3d06b`](https://github.com/apache/spark/commit/6e3d06b0f282585f45eb72fa3b03a09ffe343729). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4214#issuecomment-75933995 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27947/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4214#issuecomment-75325198 [Test build #27791 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27791/consoleFull) for PR 4214 at commit [`70c28d6`](https://github.com/apache/spark/commit/70c28d671ee60bd04554b6709956df54aab8b64a). * This patch **does not merge cleanly**. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25109304 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -230,6 +250,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } /** + * Delete event logs from the log directory according to the clean policy defined by the user. + */ + private def cleanLogs() = { --- End diff -- `: Unit` (see https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25109562 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -230,6 +250,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } /** + * Delete event logs from the log directory according to the clean policy defined by the user. + */ + private def cleanLogs() = { +try { + val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) +.getOrElse(Seq[FileStatus]()) + val maxAge = conf.getLong(spark.history.fs.cleaner.maxAge.seconds, +DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000 + + val now = System.currentTimeMillis() + val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + def addIfNotExpire(info: FsApplicationHistoryInfo) = { --- End diff -- `: Unit` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25109254 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -73,27 +103,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private[history] val APPLICATION_COMPLETE = APPLICATION_COMPLETE /** - * A background thread that periodically checks for event log updates on disk. - * - * If a log check is invoked manually in the middle of a period, this thread re-adjusts the - * time at which it performs the next log check to maintain the same period as before. - * - * TODO: Add a mechanism to update manually. + * A background thread that periodically do something about event log. --- End diff -- this comment is super vague! How about ``` /** * Return a runnable that performs the given operation on the event logs. * This operation is expected to be executed periodically. */ ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25109374 --- Diff: docs/monitoring.md --- @@ -144,6 +144,29 @@ follows: If disabled, no access control checks are made. /td /tr + tr +tdspark.history.fs.cleaner.enable/td --- End diff -- elsewhere we use `enabled` (e.g. spark.eventLog.enabled) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25109930 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -230,6 +250,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } /** + * Delete event logs from the log directory according to the clean policy defined by the user. + */ + private def cleanLogs() = { +try { + val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) +.getOrElse(Seq[FileStatus]()) + val maxAge = conf.getLong(spark.history.fs.cleaner.maxAge.seconds, +DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000 + + val now = System.currentTimeMillis() + val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + def addIfNotExpire(info: FsApplicationHistoryInfo) = { +if (now - info.lastUpdated = maxAge) { + newApps += (info.id - info) +} + } + + val oldIterator = applications.values.iterator.buffered + oldIterator.foreach(addIfNotExpire) + + applications = newApps + + // Scan all logs from the log directory. + // Only directories older than now maxAge milliseconds mill will be deleted + statusList.foreach { dir = +try { + if (now - getModificationTime(dir) maxAge) { +fs.delete(dir.getPath, true) --- End diff -- can you add a quick comment of what `true` means here: ``` fs.delete(dir.getPath, true /* recursive */) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/4214#issuecomment-75336100 [Test build #27791 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/27791/consoleFull) for PR 4214 at commit [`70c28d6`](https://github.com/apache/spark/commit/70c28d671ee60bd04554b6709956df54aab8b64a). * This patch **fails Spark unit tests**. * This patch **does not merge cleanly**. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25108992 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -43,9 +47,33 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val NOT_STARTED = Not Started + // One day + private val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds + + // One week + private val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds --- End diff -- should these constants be put in the `object FsHistoryProvider` instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25109452 --- Diff: docs/monitoring.md --- @@ -144,6 +144,29 @@ follows: If disabled, no access control checks are made. /td /tr + tr +tdspark.history.fs.cleaner.enable/td +tdfalse/td +td + Specifies whether the History Server should periodically clean up event logs from storage. +/td + /tr + tr +tdspark.history.fs.cleaner.interval.seconds/td +td86400/td +td + How often the job history cleaner checks for files to delete, in seconds. Defaults to 864000 (one day). --- End diff -- 86400 (extra 0) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25109082 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -73,27 +103,15 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private[history] val APPLICATION_COMPLETE = APPLICATION_COMPLETE /** - * A background thread that periodically checks for event log updates on disk. - * - * If a log check is invoked manually in the middle of a period, this thread re-adjusts the - * time at which it performs the next log check to maintain the same period as before. - * - * TODO: Add a mechanism to update manually. + * A background thread that periodically do something about event log. --- End diff -- this comment is super vague! What is the something that this thread does? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25109806 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -230,6 +250,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } /** + * Delete event logs from the log directory according to the clean policy defined by the user. + */ + private def cleanLogs() = { +try { + val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) +.getOrElse(Seq[FileStatus]()) + val maxAge = conf.getLong(spark.history.fs.cleaner.maxAge.seconds, +DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000 + + val now = System.currentTimeMillis() + val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + def addIfNotExpire(info: FsApplicationHistoryInfo) = { +if (now - info.lastUpdated = maxAge) { + newApps += (info.id - info) +} + } + + val oldIterator = applications.values.iterator.buffered + oldIterator.foreach(addIfNotExpire) --- End diff -- what's the point of (1) storing it in a variable here if we just use it right away, (2) using a buffered iterator, and (3) defining `addIfNotExpired`? I think we can just do ``` // Retain applications that have not expired applications.values.foreach { if (now - info.lastUpdated = maxAge) { newApps += (info.id - info) } } ``` I believe this change simplifies the code without sacrificing any readability --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25109839 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -230,6 +250,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } /** + * Delete event logs from the log directory according to the clean policy defined by the user. + */ + private def cleanLogs() = { +try { + val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) +.getOrElse(Seq[FileStatus]()) + val maxAge = conf.getLong(spark.history.fs.cleaner.maxAge.seconds, +DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000 + + val now = System.currentTimeMillis() + val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() --- End diff -- not really `newApp`. More like `appsToRetain` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/4214#issuecomment-75334552 @viper-kun Thanks for re-opening this, and my apologies for having neglected the older patch. I left a few comments inline, most of which are minor ones that have to do with naming and docs. Otherwise this patch looks fairly straightforward and I would like to get it merged soon. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on the pull request: https://github.com/apache/spark/pull/4214#issuecomment-75324440 ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25109674 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -230,6 +250,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } /** + * Delete event logs from the log directory according to the clean policy defined by the user. + */ + private def cleanLogs() = { +try { + val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) +.getOrElse(Seq[FileStatus]()) + val maxAge = conf.getLong(spark.history.fs.cleaner.maxAge.seconds, +DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000 + + val now = System.currentTimeMillis() + val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + def addIfNotExpire(info: FsApplicationHistoryInfo) = { +if (now - info.lastUpdated = maxAge) { + newApps += (info.id - info) +} + } + + val oldIterator = applications.values.iterator.buffered + oldIterator.foreach(addIfNotExpire) + + applications = newApps + + // Scan all logs from the log directory. + // Only directories older than now maxAge milliseconds mill will be deleted --- End diff -- ``` // Only directories older than the specified max age will be deleted ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25109639 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -230,6 +250,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } /** + * Delete event logs from the log directory according to the clean policy defined by the user. + */ + private def cleanLogs() = { +try { + val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) +.getOrElse(Seq[FileStatus]()) + val maxAge = conf.getLong(spark.history.fs.cleaner.maxAge.seconds, +DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000 + + val now = System.currentTimeMillis() + val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + def addIfNotExpire(info: FsApplicationHistoryInfo) = { --- End diff -- also, `addIfNotExpired`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25109592 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -230,6 +250,45 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } /** + * Delete event logs from the log directory according to the clean policy defined by the user. + */ + private def cleanLogs() = { +try { + val statusList = Option(fs.listStatus(new Path(logDir))).map(_.toSeq) +.getOrElse(Seq[FileStatus]()) + val maxAge = conf.getLong(spark.history.fs.cleaner.maxAge.seconds, +DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000 + + val now = System.currentTimeMillis() + val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + def addIfNotExpire(info: FsApplicationHistoryInfo) = { +if (now - info.lastUpdated = maxAge) { + newApps += (info.id - info) +} + } + + val oldIterator = applications.values.iterator.buffered + oldIterator.foreach(addIfNotExpire) + + applications = newApps + + // Scan all logs from the log directory. + // Only directories older than now maxAge milliseconds mill will be deleted + statusList.foreach { dir = +try { + if (now - getModificationTime(dir) maxAge) { +fs.delete(dir.getPath, true) + } +} catch { + case t: IOException = logError(sIOException in cleaning logs of $dir, t) --- End diff -- one too many spaces here `cleaning logs of $dir` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4214#issuecomment-75336108 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/27791/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25106244 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -43,9 +47,33 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val NOT_STARTED = Not Started + // One day + private val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds + + // One week + private val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds + + private def warnUpdateInterval(key: String, value: String): String = { +logWarning(sUsing $key to set interval + + between each check for event log updates is deprecated, + + please use spark.history.fs.update.interval.seconds instead.) +value + } + + private def getDeprecatedConfig(conf: SparkConf, key: String): Option[String] = { +conf.getOption(key).map(warnUpdateInterval(key, _)) + } --- End diff -- can you do all of these through `SparkConf.deprecatedConfigs` instead of doing it here? You may need to rebase to master to get those changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25109482 --- Diff: docs/monitoring.md --- @@ -144,6 +144,29 @@ follows: If disabled, no access control checks are made. /td /tr + tr +tdspark.history.fs.cleaner.enable/td +tdfalse/td +td + Specifies whether the History Server should periodically clean up event logs from storage. +/td + /tr + tr +tdspark.history.fs.cleaner.interval.seconds/td +td86400/td +td + How often the job history cleaner checks for files to delete, in seconds. Defaults to 864000 (one day). + Files are only deleted if they are older than spark.history.fs.cleaner.maxAge.seconds. +/td + /tr + tr +tdspark.history.fs.cleaner.maxAge.seconds/td +td604800/td --- End diff -- can you break this down to `3600 * 24 * 7` here instead so it's immediately clear that it's a week? (same in L167) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25109543 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -53,8 +81,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf)) - // A timestamp of when the disk was last accessed to check for log updates - private var lastLogCheckTimeMs = -1L + // Scheduled thread pool size must be one, otherwise it will have concurrent issues about fs + // and applications between check task and clean task. + private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() --- End diff -- how about some comment on what this pool is used for --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user andrewor14 commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r25110141 --- Diff: docs/monitoring.md --- @@ -85,7 +85,7 @@ follows: /td /tr tr -tdspark.history.fs.updateInterval/td +tdspark.history.fs.interval.seconds/td --- End diff -- I believe this should be `spark.history.fs.update.interval.seconds` instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user viper-kun commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r23750085 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -163,9 +179,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis * applications that haven't been updated since last time the logs were checked. */ private[history] def checkForLogs(): Unit = { -lastLogCheckTimeMs = getMonotonicTimeMs() -logDebug(Checking for logs. Time is now %d..format(lastLogCheckTimeMs)) --- End diff -- checkForlogs thread is Scheduled by pool. lastLogCheckTimeMs is no use and remove it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user viper-kun commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r23750759 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -113,12 +129,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis Logging directory specified is not a directory: %s.format(logDir)) } -checkForLogs() +// A task that periodically checks for event log updates on disk. +pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS, TimeUnit.MILLISECONDS) -// Disable the background thread during tests. -if (!conf.contains(spark.testing)) { --- End diff -- I got it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user viper-kun commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r23750788 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -53,8 +79,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf)) - // A timestamp of when the disk was last accessed to check for log updates - private var lastLogCheckTimeMs = -1L + // The schedule thread pool size must be one, otherwise it will have concurrent issues about fs + // and applications between check task and clean task.. --- End diff -- Sorry, i don't know what you means? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user viper-kun closed the pull request at: https://github.com/apache/spark/pull/2471 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-71702876 @viper-kun could you close this one in that case? thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/4214#issuecomment-71743602 LGTM aside from some very minor things. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r23649795 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -43,9 +47,31 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val NOT_STARTED = Not Started + // One day + private val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds + + // One week + private val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds + + private def warnUpdateInterval(key: String, value: String): String = { +logWarning(sUsing $key to set interval + + between each check for event log updates is deprecated, + + please use spark.history.fs.update.interval.seconds instead.) +value + } + // Interval between each check for event log updates - private val UPDATE_INTERVAL_MS = conf.getInt(spark.history.fs.updateInterval, -conf.getInt(spark.history.updateInterval, 10)) * 1000 + private val UPDATE_INTERVAL_MS = conf.getOption(spark.history.fs.update.interval.seconds) +.orElse(conf.getOption(spark.history.fs.updateInterval) +.map(warnUpdateInterval(spark.history.fs.updateInterval, _))) --- End diff -- Just for neatness, I'd have a single method that does both `conf.getOption()` and `.map()`: private def getDeprecatedConfig(conf: SparkConf, key: String): Option[String] Then just do: conf.getOption(newKey).orElse(getDeprecatedKey(old1)).orElse(getDeprecatedKey(old2)) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r23649826 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -53,8 +79,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf)) - // A timestamp of when the disk was last accessed to check for log updates - private var lastLogCheckTimeMs = -1L + // The schedule thread pool size must be one, otherwise it will have concurrent issues about fs --- End diff -- super nit: scheduled --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r23652781 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -113,12 +129,12 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis Logging directory specified is not a directory: %s.format(logDir)) } -checkForLogs() +// A task that periodically checks for event log updates on disk. +pool.scheduleAtFixedRate(getRunner(checkForLogs), 0, UPDATE_INTERVAL_MS, TimeUnit.MILLISECONDS) -// Disable the background thread during tests. -if (!conf.contains(spark.testing)) { --- End diff -- I don't know why this was turned off for testing, but it doesn't seem like you want to change that behavior, do you? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r23649845 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -53,8 +79,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val fs = Utils.getHadoopFileSystem(logDir, SparkHadoopUtil.get.newConfiguration(conf)) - // A timestamp of when the disk was last accessed to check for log updates - private var lastLogCheckTimeMs = -1L + // The schedule thread pool size must be one, otherwise it will have concurrent issues about fs + // and applications between check task and clean task.. --- End diff -- nit: too many periods --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r23652649 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -163,9 +179,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis * applications that haven't been updated since last time the logs were checked. */ private[history] def checkForLogs(): Unit = { -lastLogCheckTimeMs = getMonotonicTimeMs() -logDebug(Checking for logs. Time is now %d..format(lastLogCheckTimeMs)) --- End diff -- why are you removing the logging? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/4214#discussion_r23653061 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -163,9 +179,6 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis * applications that haven't been updated since last time the logs were checked. */ private[history] def checkForLogs(): Unit = { -lastLogCheckTimeMs = getMonotonicTimeMs() -logDebug(Checking for logs. Time is now %d..format(lastLogCheckTimeMs)) --- End diff -- Good point. The tests don't expect anything to be happening in the background, instead they rely on being able to trigger these updates explicitly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
GitHub user viper-kun opened a pull request: https://github.com/apache/spark/pull/4214 [SPARK-3562]Periodic cleanup event logs You can merge this pull request into a Git repository by running: $ git pull https://github.com/viper-kun/spark cleaneventlog Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/4214.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4214 commit adcfe869863cd5f73b06143d41f138ea3b8d145f Author: xukun 00228947 xukun...@huawei.com Date: 2015-01-27T01:30:41Z Periodic cleanup event logs --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user viper-kun commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-71575458 I have file a new pr #4214 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/4214#issuecomment-71575569 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user suyanNone commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-71309691 @vanzin = =! I got it, sigh~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-71058558 I'm not a committer so I can't merge the patch. But it has merge conflicts now, so that at least needs to be fixed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user suyanNone commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-70964880 Is thi patch ok to merge? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r20600235 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -46,8 +72,10 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val fs = Utils.getHadoopFileSystem(resolvedLogDir, SparkHadoopUtil.get.newConfiguration(conf)) - // A timestamp of when the disk was last accessed to check for log updates - private var lastLogCheckTimeMs = -1L + // The schedule thread pool size must be one, otherwise it will have concurrent issues about fs + // and applications between check task and clean task.. + private val pool = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() --- End diff -- minor: you could use `Utils.namedThreadFactory()` here (just noticed that method the other day). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-63698395 LGTM. Everybody else is kinda busy with releases so I doubt they'll look at this in the next several days... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user viper-kun commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-60541215 @vanzin @andrewor14 @srowen . is it ok to go? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r19179055 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -210,7 +226,46 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis applications = newApps } } catch { - case t: Throwable = logError(Exception in checking for event log updates, t) + case t: Exception = logError(Exception in checking for event log updates, t) +} + } + + /** + * Delete event logs from the log directory according to the clean policy defined by the user. + */ + private def cleanLogs() = { +try { + val logStatus = fs.listStatus(new Path(resolvedLogDir)) + val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() + val maxAge = conf.getLong(spark.history.fs.maxAge.seconds, +DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000 + + val now = System.currentTimeMillis() + val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + def addIfNotExpire(info: FsApplicationHistoryInfo) = { +if (now - info.lastUpdated = maxAge) { + newApps += (info.id - info) +} + } + + val oldIterator = applications.values.iterator.buffered + oldIterator.foreach(addIfNotExpire) + + applications = newApps + + // Scan all logs from the log directory. + // Only directories older than now maxAge milliseconds mill will be deleted + logDirs.foreach { dir = +try{ + if (now - getModificationTime(dir) maxAge) { +fs.delete(dir.getPath, true) + } +} catch { + case t: IOException = logError(IOException in cleaning logs, t) --- End diff -- nit: add `$dir` to the log message, in case it does not show up in the exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r19179121 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -29,14 +31,36 @@ import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.Utils +import com.google.common.util.concurrent.ThreadFactoryBuilder --- End diff -- nit: this should come before the `org.hadoop.` import (and be in the same group). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r19179186 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -29,14 +31,36 @@ import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.Utils +import com.google.common.util.concurrent.ThreadFactoryBuilder + private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider with Logging { private val NOT_STARTED = Not Started + // One day + private val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds + + // One week + private val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds + + private def warnUpdateInterval(value: String): String = { +logWarning(Using spark.history.fs.updateInterval to set interval + + between each check for event log updates is deprecated, + + please use spark.history.fs.update.interval.seconds instead.) +value + } + // Interval between each check for event log updates - private val UPDATE_INTERVAL_MS = conf.getInt(spark.history.fs.updateInterval, -conf.getInt(spark.history.updateInterval, 10)) * 1000 + private val UPDATE_INTERVAL_MS = conf.getOption(spark.history.fs.update.interval.seconds) + .orElse(conf.getOption(spark.history.fs.updateInterval).map(warnUpdateInterval)) +.orElse(conf.getOption(spark.history.updateInterval)) --- End diff -- nit: could warn here too. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-59998653 @viper-kun lgtm, but you'll need to get the attention of a committer. :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user viper-kun commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-60027733 @vanzin @andrewor14. is it ok to go? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user viper-kun commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-59648648 @vanzin. is it ok to go? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user viper-kun commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-59164199 @vanzin , is it ok to go? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user mattf commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-59191797 Well, good luck with adding something like that to HDFS... that is not the responsibility of filesystems. just so we're on the same page, i'm not advocating adding this functionality to HDFS. i believe it should be separate functionality that doesn't live in a spark process, as it's an operational activity. a directory that is completely managed within Spark (the event log directory) shouldn't also be cleaned up by Spark we can agree to disagree. you can view a trace log as allocating some amount of disk space that you then have to manage, similar to memory allocations in a program. however, doing that management is more involved than periodically rotating. if you're in nyc for strata we should grab a beer and debate the finer points of resource management. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-59231360 @mattf Always in for a beer but unfortunately I'm not in NY... Also, you mention rotating a lot. This is not rotating. This is cleaning up, as in deleting existing logs. Having an external process do that for you complicates the History Server because now there's the possibility that it will serve stale data - links to jobs that have been cleaned up by that external process, and will result in an error in the UI. (Which is why I mentioned above that an external sweeper without inotify is a no-go.) Also, this is not a trace log. This is application history information. This is not your Linux syslog or Windows event log. This is very application-specific. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-59231417 @viper-kun haven't had a chance to look at the diff again, but it seems there are merge conflicts now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18934400 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -34,9 +36,19 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val NOT_STARTED = Not Started + // One day + private val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds + + // One week + private val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds + // Interval between each check for event log updates - private val UPDATE_INTERVAL_MS = conf.getInt(spark.history.fs.updateInterval, --- End diff -- You can't just remove the existing setting. You need to still read it for backwards compatibility. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18934416 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -34,9 +36,19 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val NOT_STARTED = Not Started + // One day + private val DEFAULT_SPARK_HISTORY_FS_CLEANER_INTERVAL_S = Duration(1, TimeUnit.DAYS).toSeconds + + // One week + private val DEFAULT_SPARK_HISTORY_FS_MAXAGE_S = Duration(7, TimeUnit.DAYS).toSeconds + // Interval between each check for event log updates - private val UPDATE_INTERVAL_MS = conf.getInt(spark.history.fs.updateInterval, --- End diff -- (Bonus for printing a warning log if the old setting is used.) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18934425 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -46,8 +58,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val fs = Utils.getHadoopFileSystem(resolvedLogDir, SparkHadoopUtil.get.newConfiguration(conf)) - // A timestamp of when the disk was last accessed to check for log updates - private var lastLogCheckTimeMs = -1L + // The schedule thread pool size must be one,otherwise it will have concurrent issues about fs --- End diff -- nit: space after comma --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18934463 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -60,29 +73,23 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis = new mutable.LinkedHashMap() /** - * A background thread that periodically checks for event log updates on disk. - * - * If a log check is invoked manually in the middle of a period, this thread re-adjusts the - * time at which it performs the next log check to maintain the same period as before. - * - * TODO: Add a mechanism to update manually. + * A background thread that periodically do something about event log. */ - private val logCheckingThread = new Thread(LogCheckingThread) { -override def run() = Utils.logUncaughtExceptions { - while (true) { -val now = getMonotonicTimeMs() -if (now - lastLogCheckTimeMs UPDATE_INTERVAL_MS) { - Thread.sleep(UPDATE_INTERVAL_MS) -} else { - // If the user has manually checked for logs recently, wait until - // UPDATE_INTERVAL_MS after the last check time - Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now) -} -checkForLogs() + private def getThread(name: String, operateFun: () = Unit): Thread = { --- End diff -- This should return a Runnable, not a Thread, since you're submitting them to the executor. Thread works but is overkill here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18934480 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -46,8 +58,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val fs = Utils.getHadoopFileSystem(resolvedLogDir, SparkHadoopUtil.get.newConfiguration(conf)) - // A timestamp of when the disk was last accessed to check for log updates - private var lastLogCheckTimeMs = -1L + // The schedule thread pool size must be one,otherwise it will have concurrent issues about fs + // and applications between check task and clean task.. + private val pool = Executors.newScheduledThreadPool(1) --- End diff -- You should use a thread factory so that you can set the thread name and daemon status. See com.google.common.util.concurrent.ThreadFactoryBuilder. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18934502 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -60,29 +73,23 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis = new mutable.LinkedHashMap() /** - * A background thread that periodically checks for event log updates on disk. - * - * If a log check is invoked manually in the middle of a period, this thread re-adjusts the - * time at which it performs the next log check to maintain the same period as before. - * - * TODO: Add a mechanism to update manually. + * A background thread that periodically do something about event log. */ - private val logCheckingThread = new Thread(LogCheckingThread) { -override def run() = Utils.logUncaughtExceptions { - while (true) { -val now = getMonotonicTimeMs() -if (now - lastLogCheckTimeMs UPDATE_INTERVAL_MS) { - Thread.sleep(UPDATE_INTERVAL_MS) -} else { - // If the user has manually checked for logs recently, wait until - // UPDATE_INTERVAL_MS after the last check time - Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now) -} -checkForLogs() + private def getThread(name: String, operateFun: () = Unit): Thread = { +val thread = new Thread(name) { + override def run() = Utils.logUncaughtExceptions { +operateFun() } } +thread } + // A background thread that periodically checks for event log updates on disk. + private val logCheckingThread = getThread(LogCheckingThread, checkForLogs) + + // A background thread that periodically cleans event logs on disk. + private val logCleaningThread = getThread(LogCleaningThread, cleanLogs) --- End diff -- And this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18934514 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -97,9 +104,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis Logging directory specified is not a directory: %s.format(resolvedLogDir)) } -checkForLogs() logCheckingThread.setDaemon(true) -logCheckingThread.start() +pool.scheduleAtFixedRate(logCheckingThread, 0, UPDATE_INTERVAL_MS, TimeUnit.MILLISECONDS) + +// Start cleaner thread if spark.history.fs.cleaner.enable is true --- End diff -- Comment is sort of redundant... also, you're not starting a thread, you're scheduling a task. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18934497 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -60,29 +73,23 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis = new mutable.LinkedHashMap() /** - * A background thread that periodically checks for event log updates on disk. - * - * If a log check is invoked manually in the middle of a period, this thread re-adjusts the - * time at which it performs the next log check to maintain the same period as before. - * - * TODO: Add a mechanism to update manually. + * A background thread that periodically do something about event log. */ - private val logCheckingThread = new Thread(LogCheckingThread) { -override def run() = Utils.logUncaughtExceptions { - while (true) { -val now = getMonotonicTimeMs() -if (now - lastLogCheckTimeMs UPDATE_INTERVAL_MS) { - Thread.sleep(UPDATE_INTERVAL_MS) -} else { - // If the user has manually checked for logs recently, wait until - // UPDATE_INTERVAL_MS after the last check time - Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now) -} -checkForLogs() + private def getThread(name: String, operateFun: () = Unit): Thread = { +val thread = new Thread(name) { + override def run() = Utils.logUncaughtExceptions { +operateFun() } } +thread } + // A background thread that periodically checks for event log updates on disk. + private val logCheckingThread = getThread(LogCheckingThread, checkForLogs) --- End diff -- Do you need to keep references to this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18934524 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -210,7 +220,44 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis applications = newApps } } catch { - case t: Throwable = logError(Exception in checking for event log updates, t) + case t: Exception = logError(Exception in checking for event log updates, t) +} + } + + /** + * Delete event logs from the log directory according to the clean policy defined by the user. + */ + private def cleanLogs() = { +try { + val logStatus = fs.listStatus(new Path(resolvedLogDir)) + val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() + val maxAge = conf.getLong(spark.history.fs.maxAge.seconds, +DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000 + + val now = System.currentTimeMillis() + + // Scan all logs from the log directory. + // Only directories older than now maxAge milliseconds mill will be deleted + logDirs.foreach { dir = +if (now - getModificationTime(dir) maxAge) { + fs.delete(dir.getPath, true) --- End diff -- You forgot to handle exceptions here (see my previous coments on the subject). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18934542 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -210,7 +220,44 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis applications = newApps } } catch { - case t: Throwable = logError(Exception in checking for event log updates, t) + case t: Exception = logError(Exception in checking for event log updates, t) +} + } + + /** + * Delete event logs from the log directory according to the clean policy defined by the user. + */ + private def cleanLogs() = { +try { + val logStatus = fs.listStatus(new Path(resolvedLogDir)) + val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() + val maxAge = conf.getLong(spark.history.fs.maxAge.seconds, +DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000 + + val now = System.currentTimeMillis() + + // Scan all logs from the log directory. + // Only directories older than now maxAge milliseconds mill will be deleted + logDirs.foreach { dir = +if (now - getModificationTime(dir) maxAge) { + fs.delete(dir.getPath, true) +} + } + + val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + def addIfNotExpire(info: FsApplicationHistoryInfo) = { +if (now - info.lastUpdated = maxAge) { + newApps += (info.id - info) +} + } + + val oldIterator = applications.values.iterator.buffered + oldIterator.foreach(addIfNotExpire) + + applications = newApps --- End diff -- I think you should update the app list before you clean up the file system. That way, you won't be serving links to dead logs (even if only for a short period). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-59302984 @viper-kun mostly good, just a few minor things left as far as I'm concerned. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18935441 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -46,8 +58,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private val fs = Utils.getHadoopFileSystem(resolvedLogDir, SparkHadoopUtil.get.newConfiguration(conf)) - // A timestamp of when the disk was last accessed to check for log updates - private var lastLogCheckTimeMs = -1L + // The schedule thread pool size must be one,otherwise it will have concurrent issues about fs + // and applications between check task and clean task.. + private val pool = Executors.newScheduledThreadPool(1) --- End diff -- Ah, another thing: you should override `stop()` and shut down this executor cleanly (it's mostly a best effort thing, but still). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user viper-kun commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-59038886 in my opinion, spark create event log data, and spark delete it. In hadoop, event log is deleted by JobHistoryServer, not by fileSystem. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user viper-kun commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18763243 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -214,6 +224,43 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } --- End diff -- @vanzin sorry, i do not what you means. do you means that do not throw Throwable? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user mattf commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-58885115 @mattf I understand what you're trying to say, but think about it in context. As I said above, the when to poll the file system code is the most trivial part of this change. The only advantage of using cron for that is that you'd have more scheduling options - e.g., absolute times instead of a period. To achieve that, you'd be considerably complicating everything else. You'd be creating a new command line tool in Spark, that needs to deal with command line arguments, be documented, and handle security settings (e.g. kerberos) - so it's more burden for everybody, maintaners of the code and admins alike. And all that for a trivial, and I'd say, not really needed gain in functionality. @aw-altiscale pointed me to camus which has a nearly separable component: https://github.com/linkedin/camus/tree/master/camus-sweeper my objection to this is about the architecture and responsibilities of the spark components. i don't object to having the functionality. i think you should implement the ability to sweep/rotate/clean log files in hdfs, but not as part of a spark process. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-58924618 i think you should implement the ability to sweep/rotate/clean log files in hdfs Well, good luck with adding something like that to HDFS... that is not the responsibility of filesystems. It's great that there might be tools out there that does it, but, personally, you still haven't convinced me that a directory that is completely managed within Spark (the event log directory) shouldn't also be cleaned up by Spark. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18781725 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -214,6 +224,43 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } --- End diff -- Throables are catastrophic errors, you shouldn't catch them. (IIRC they're also used as control for some Scala idioms, which also means you shouldn't catch them.) Catch `Exception` instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18782185 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } -val newIterator = logInfos.iterator.buffered -val oldIterator = applications.values.iterator.buffered -while (newIterator.hasNext oldIterator.hasNext) { - if (newIterator.head.endTime oldIterator.head.endTime) { -addIfAbsent(newIterator.next) - } else { -addIfAbsent(oldIterator.next) +applications.synchronized { + val newIterator = logInfos.iterator.buffered + val oldIterator = applications.values.iterator.buffered + while (newIterator.hasNext oldIterator.hasNext) { +if (newIterator.head.endTime oldIterator.head.endTime) { + addIfAbsent(newIterator.next) +} else { + addIfAbsent(oldIterator.next) +} } + newIterator.foreach(addIfAbsent) + oldIterator.foreach(addIfAbsent) + + applications = newApps } -newIterator.foreach(addIfAbsent) -oldIterator.foreach(addIfAbsent) + } +} catch { + case t: Throwable = logError(Exception in checking for event log updates, t) +} + } + + /** + * Deleting apps if setting cleaner. + */ + private def cleanLogs() = { +lastLogCleanTimeMs = getMonotonicTimeMs() +logDebug(Cleaning logs. Time is now %d..format(lastLogCleanTimeMs)) +try { + val logStatus = fs.listStatus(new Path(resolvedLogDir)) + val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() + val maxAge = conf.getLong(spark.history.fs.maxAge.seconds, +DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000 + + val now = System.currentTimeMillis() + fs.synchronized { +// scan all logs from the log directory. +// Only directories older than this many seconds will be deleted . +logDirs.foreach { dir = + // history file older than this many seconds will be deleted + // when the history cleaner runs. + if (now - getModificationTime(dir) maxAge) { +fs.delete(dir.getPath, true) --- End diff -- Because the existing `try..catch` means that if you fail to delete a directory, you'll stop trying to delete others. So if a directory in the middle of the list has wrong permissions, you'll never clean up any directory that is more recent than it is (well, depending on the ordering HDFS returns the file list). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18782362 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } -val newIterator = logInfos.iterator.buffered -val oldIterator = applications.values.iterator.buffered -while (newIterator.hasNext oldIterator.hasNext) { - if (newIterator.head.endTime oldIterator.head.endTime) { -addIfAbsent(newIterator.next) - } else { -addIfAbsent(oldIterator.next) +applications.synchronized { --- End diff -- You shouldn't ever need to synchronize on `applications` because it's a read-only variable. It's replaced atomically with a new list (which is why it's volatile) when there are changes. But synchronizing on it doesn't achieve anything. Your explanation doesn't cover a whole lot of other sources of races when two tasks are running concurrently and looking at the current status of the file system (and potentially modifying it). So yeah, having these tasks run single-threaded would be much more simple to reason about. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user aw-altiscale commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-58933013 *appears in a puff of smoke* Hi. I did a very quick skim. You might be interested in HDFS-6382. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-58934284 inotify is certainly interesting for the HistoryServer, but kinda orthogonal to this. The other bug is more related to this one, and they're discussing exactly what I mentioned: that deleting things is not the responsibility of the fs. The applications that manage the files are the ones who have the most context about when it's safe to delete things and how to do so. This case is a simplified one in that when doesn't matter much, as long as the list of things that exist is kept up-to-date in the application. But the point remains that the fs seems like the wrong place for that; and having a sweeper-like feature in HDFS without inotify would be kinda bad for applications like the HistoryServer. But I digress. Even if HDFS-6382 is implemented, it would only come in a new version of HDFS, and Spark needs to work with existing versions... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user viper-kun commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18739911 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } -val newIterator = logInfos.iterator.buffered -val oldIterator = applications.values.iterator.buffered -while (newIterator.hasNext oldIterator.hasNext) { - if (newIterator.head.endTime oldIterator.head.endTime) { -addIfAbsent(newIterator.next) - } else { -addIfAbsent(oldIterator.next) +applications.synchronized { --- End diff -- I think there is a need for the two tasks to never run concurrently. if the order is: 1. check task get applications 2. clean task get applications 3. clean task get result, and replace applications 4. check task get result, and replace applications then clean task result is covered by check result. use a ScheduledExecutorService with a single worker thread is a good way to solve it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user viper-kun commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18740124 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } -val newIterator = logInfos.iterator.buffered -val oldIterator = applications.values.iterator.buffered -while (newIterator.hasNext oldIterator.hasNext) { - if (newIterator.head.endTime oldIterator.head.endTime) { -addIfAbsent(newIterator.next) - } else { -addIfAbsent(oldIterator.next) +applications.synchronized { + val newIterator = logInfos.iterator.buffered + val oldIterator = applications.values.iterator.buffered + while (newIterator.hasNext oldIterator.hasNext) { +if (newIterator.head.endTime oldIterator.head.endTime) { + addIfAbsent(newIterator.next) +} else { + addIfAbsent(oldIterator.next) +} } + newIterator.foreach(addIfAbsent) + oldIterator.foreach(addIfAbsent) + + applications = newApps } -newIterator.foreach(addIfAbsent) -oldIterator.foreach(addIfAbsent) + } +} catch { + case t: Throwable = logError(Exception in checking for event log updates, t) +} + } + + /** + * Deleting apps if setting cleaner. + */ + private def cleanLogs() = { +lastLogCleanTimeMs = getMonotonicTimeMs() +logDebug(Cleaning logs. Time is now %d..format(lastLogCleanTimeMs)) +try { + val logStatus = fs.listStatus(new Path(resolvedLogDir)) + val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() + val maxAge = conf.getLong(spark.history.fs.maxAge.seconds, +DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000 + + val now = System.currentTimeMillis() + fs.synchronized { +// scan all logs from the log directory. +// Only directories older than this many seconds will be deleted . +logDirs.foreach { dir = + // history file older than this many seconds will be deleted + // when the history cleaner runs. + if (now - getModificationTime(dir) maxAge) { +fs.delete(dir.getPath, true) + } +} + } + + val newApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]() + def addIfNotExpire(info: FsApplicationHistoryInfo) = { +if(now - info.lastUpdated = maxAge) { + newApps += (info.id - info) --- End diff -- info.lastUpdated is the timestamps of the directory and the info.lastUpdated is always bigger than the files timestamps. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user viper-kun commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18740169 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } -val newIterator = logInfos.iterator.buffered -val oldIterator = applications.values.iterator.buffered -while (newIterator.hasNext oldIterator.hasNext) { - if (newIterator.head.endTime oldIterator.head.endTime) { -addIfAbsent(newIterator.next) - } else { -addIfAbsent(oldIterator.next) +applications.synchronized { + val newIterator = logInfos.iterator.buffered + val oldIterator = applications.values.iterator.buffered + while (newIterator.hasNext oldIterator.hasNext) { +if (newIterator.head.endTime oldIterator.head.endTime) { + addIfAbsent(newIterator.next) +} else { + addIfAbsent(oldIterator.next) +} } + newIterator.foreach(addIfAbsent) + oldIterator.foreach(addIfAbsent) + + applications = newApps } -newIterator.foreach(addIfAbsent) -oldIterator.foreach(addIfAbsent) + } +} catch { + case t: Throwable = logError(Exception in checking for event log updates, t) +} + } + + /** + * Deleting apps if setting cleaner. + */ + private def cleanLogs() = { +lastLogCleanTimeMs = getMonotonicTimeMs() +logDebug(Cleaning logs. Time is now %d..format(lastLogCleanTimeMs)) +try { + val logStatus = fs.listStatus(new Path(resolvedLogDir)) + val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() + val maxAge = conf.getLong(spark.history.fs.maxAge.seconds, +DEFAULT_SPARK_HISTORY_FS_MAXAGE_S) * 1000 + + val now = System.currentTimeMillis() + fs.synchronized { +// scan all logs from the log directory. +// Only directories older than this many seconds will be deleted . +logDirs.foreach { dir = + // history file older than this many seconds will be deleted + // when the history cleaner runs. + if (now - getModificationTime(dir) maxAge) { +fs.delete(dir.getPath, true) --- End diff -- Can you tell me the detail reason that add try..catch into fs.delete? i think the exception may be caught by try..catch(line 271). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18740457 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } -val newIterator = logInfos.iterator.buffered -val oldIterator = applications.values.iterator.buffered -while (newIterator.hasNext oldIterator.hasNext) { - if (newIterator.head.endTime oldIterator.head.endTime) { -addIfAbsent(newIterator.next) - } else { -addIfAbsent(oldIterator.next) +applications.synchronized { + val newIterator = logInfos.iterator.buffered + val oldIterator = applications.values.iterator.buffered + while (newIterator.hasNext oldIterator.hasNext) { +if (newIterator.head.endTime oldIterator.head.endTime) { + addIfAbsent(newIterator.next) +} else { + addIfAbsent(oldIterator.next) +} } + newIterator.foreach(addIfAbsent) + oldIterator.foreach(addIfAbsent) + + applications = newApps } -newIterator.foreach(addIfAbsent) -oldIterator.foreach(addIfAbsent) + } +} catch { + case t: Throwable = logError(Exception in checking for event log updates, t) +} + } + + /** + * Deleting apps if setting cleaner. + */ + private def cleanLogs() = { +lastLogCleanTimeMs = getMonotonicTimeMs() +logDebug(Cleaning logs. Time is now %d..format(lastLogCleanTimeMs)) --- End diff -- Nit: string interpolation is probably clearer: `s:Cleaning ... now $lastLogCleanTimeMs` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user viper-kun commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18741084 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } -val newIterator = logInfos.iterator.buffered -val oldIterator = applications.values.iterator.buffered -while (newIterator.hasNext oldIterator.hasNext) { - if (newIterator.head.endTime oldIterator.head.endTime) { -addIfAbsent(newIterator.next) - } else { -addIfAbsent(oldIterator.next) +applications.synchronized { + val newIterator = logInfos.iterator.buffered + val oldIterator = applications.values.iterator.buffered + while (newIterator.hasNext oldIterator.hasNext) { +if (newIterator.head.endTime oldIterator.head.endTime) { + addIfAbsent(newIterator.next) +} else { + addIfAbsent(oldIterator.next) +} } + newIterator.foreach(addIfAbsent) + oldIterator.foreach(addIfAbsent) + + applications = newApps } -newIterator.foreach(addIfAbsent) -oldIterator.foreach(addIfAbsent) + } +} catch { + case t: Throwable = logError(Exception in checking for event log updates, t) --- End diff -- you means: don't catch Throwable? what should we do? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user viper-kun commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-58624839 @mattf @vanzin is this ok to go ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18737207 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -28,15 +28,27 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.Utils +import java.util.concurrent.TimeUnit --- End diff -- nit: should be up there with other java imports --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18737213 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -28,15 +28,27 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ import org.apache.spark.ui.SparkUI import org.apache.spark.util.Utils +import java.util.concurrent.TimeUnit +import scala.concurrent.duration.Duration --- End diff -- nit: should be up there with other scala imports --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18737263 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -54,35 +66,57 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis // is already known. private var lastModifiedTime = -1L + // A timestamp of when the disk was last accessed to check for event log to delete + private var lastLogCleanTimeMs = -1L + // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted // into the map in order, so the LinkedHashMap maintains the correct ordering. @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] = new mutable.LinkedHashMap() /** - * A background thread that periodically checks for event log updates on disk. - * - * If a log check is invoked manually in the middle of a period, this thread re-adjusts the - * time at which it performs the next log check to maintain the same period as before. + * A background thread that periodically do something about event log. * - * TODO: Add a mechanism to update manually. + * If operateFun is invoked manually in the middle of a period, this thread re-adjusts the + * time at which it does operateFun to maintain the same period as before. */ - private val logCheckingThread = new Thread(LogCheckingThread) { -override def run() = Utils.logUncaughtExceptions { - while (true) { -val now = getMonotonicTimeMs() -if (now - lastLogCheckTimeMs UPDATE_INTERVAL_MS) { - Thread.sleep(UPDATE_INTERVAL_MS) -} else { - // If the user has manually checked for logs recently, wait until - // UPDATE_INTERVAL_MS after the last check time - Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now) + private def getThread( + name: String, + lastTimeMsFun: () = Long, --- End diff -- You could manage this value inside the thread itself and avoid having the fields for each thread and the extra code to pass it to this method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18737282 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -54,35 +66,57 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis // is already known. private var lastModifiedTime = -1L + // A timestamp of when the disk was last accessed to check for event log to delete + private var lastLogCleanTimeMs = -1L + // Mapping of application IDs to their metadata, in descending end time order. Apps are inserted // into the map in order, so the LinkedHashMap maintains the correct ordering. @volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo] = new mutable.LinkedHashMap() /** - * A background thread that periodically checks for event log updates on disk. - * - * If a log check is invoked manually in the middle of a period, this thread re-adjusts the - * time at which it performs the next log check to maintain the same period as before. + * A background thread that periodically do something about event log. * - * TODO: Add a mechanism to update manually. + * If operateFun is invoked manually in the middle of a period, this thread re-adjusts the + * time at which it does operateFun to maintain the same period as before. */ - private val logCheckingThread = new Thread(LogCheckingThread) { -override def run() = Utils.logUncaughtExceptions { - while (true) { -val now = getMonotonicTimeMs() -if (now - lastLogCheckTimeMs UPDATE_INTERVAL_MS) { - Thread.sleep(UPDATE_INTERVAL_MS) -} else { - // If the user has manually checked for logs recently, wait until - // UPDATE_INTERVAL_MS after the last check time - Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now) + private def getThread( --- End diff -- I wonder if it isn't better to use a `ScheduledExecutorService` here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18737405 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -195,22 +241,68 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis } } -val newIterator = logInfos.iterator.buffered -val oldIterator = applications.values.iterator.buffered -while (newIterator.hasNext oldIterator.hasNext) { - if (newIterator.head.endTime oldIterator.head.endTime) { -addIfAbsent(newIterator.next) - } else { -addIfAbsent(oldIterator.next) +applications.synchronized { --- End diff -- There isn't a need to synchronize on `applications` (here nor in the other method). The idea is that you create the new list and atomically replace the old one (note how it's `volatile`). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2471#discussion_r18737439 --- Diff: core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala --- @@ -145,43 +186,48 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis private def checkForLogs() = { lastLogCheckTimeMs = getMonotonicTimeMs() logDebug(Checking for logs. Time is now %d..format(lastLogCheckTimeMs)) -try { - val logStatus = fs.listStatus(new Path(resolvedLogDir)) - val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]() +var logInfos: Seq[FsApplicationHistoryInfo] = null - // Load all new logs from the log directory. Only directories that have a modification time - // later than the last known log directory will be loaded. - var newLastModifiedTime = lastModifiedTime - val logInfos = logDirs -.filter { dir = - if (fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) { -val modTime = getModificationTime(dir) -newLastModifiedTime = math.max(newLastModifiedTime, modTime) -modTime lastModifiedTime - } else { -false +// Load all new logs from the log directory. Only directories that have a modification time +// later than the last known log directory will be loaded. +var newLastModifiedTime = lastModifiedTime + +try { + fs.synchronized { --- End diff -- What's this synchronization trying to achieve? If there's really a need for the two tasks to never run concurrently, I'd say it's better to use a `ScheduledExecutorService` with a single worker thread. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org