Repository: spark Updated Branches: refs/heads/master 4e6fc6901 -> 4b88393cb
[SPARK-21922] Fix duration always updating when task failed but status is still RUN⦠â¦NING ## What changes were proposed in this pull request? When driver quit abnormally which cause executor shutdown and task metrics can not be sent to driver for updating.In this case the status will always be 'RUNNING' and the duration on history UI will be 'CurrentTime - launchTime' which increase infinitely. We can fix this time by modify time of event log since this time has gotten when `FSHistoryProvider` fetch event log from File System. And the result picture is uploaded in [SPARK-21922](https://issues.apache.org/jira/browse/SPARK-21922). How to reproduce? (1) Submit a job to spark on yarn (2) Mock an oom(or other case can make driver quit abnormally) senario for driver (3) Make sure executor is running task when driver quitting (4) Open the history server and checkout result It is not a corner case since there are many such jobs in our current cluster. ## How was this patch tested? Deploy historyserver and open a job has this problem. Author: zhoukang <zhoukang199...@gmail.com> Closes #19132 from caneGuy/zhoukang/fix-duration. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b88393c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b88393c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b88393c Branch: refs/heads/master Commit: 4b88393cb9f4c77f479749edf2377ed6b91280c0 Parents: 4e6fc69 Author: zhoukang <zhoukang199...@gmail.com> Authored: Thu Sep 14 20:40:33 2017 +0800 Committer: jerryshao <ss...@hortonworks.com> Committed: Thu Sep 14 20:40:33 2017 +0800 ---------------------------------------------------------------------- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- .../org/apache/spark/status/api/v1/AllStagesResource.scala | 8 +++++--- .../org/apache/spark/status/api/v1/OneStageResource.scala | 5 ++++- core/src/main/scala/org/apache/spark/ui/SparkUI.scala | 8 ++++++-- .../src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 9 +++++++-- .../src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala | 1 + core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala | 5 +++-- 7 files changed, 27 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4b88393c/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 687fd2d..20fe911 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -249,7 +249,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) val appSecManager = new SecurityManager(conf) SparkUI.createHistoryUI(conf, replayBus, appSecManager, appInfo.name, HistoryServer.getAttemptURI(appId, attempt.attemptId), - attempt.startTime) + Some(attempt.lastUpdated), attempt.startTime) // Do not call ui.bind() to avoid creating a new server for each application } http://git-wip-us.apache.org/repos/asf/spark/blob/4b88393c/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index 5602871..4a4ed95 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -47,6 +47,7 @@ private[v1] class AllStagesResource(ui: SparkUI) { listener.stageIdToData.get((stageInfo.stageId, stageInfo.attemptId)) } } yield { + stageUiData.lastUpdateTime = ui.lastUpdateTime AllStagesResource.stageUiToStageData(status, stageInfo, stageUiData, includeDetails = false) } } @@ -69,7 +70,8 @@ private[v1] object AllStagesResource { } val taskData = if (includeDetails) { - Some(stageUiData.taskData.map { case (k, v) => k -> convertTaskData(v) } ) + Some(stageUiData.taskData.map { case (k, v) => + k -> convertTaskData(v, stageUiData.lastUpdateTime) }) } else { None } @@ -136,13 +138,13 @@ private[v1] object AllStagesResource { } } - def convertTaskData(uiData: TaskUIData): TaskData = { + def convertTaskData(uiData: TaskUIData, lastUpdateTime: Option[Long]): TaskData = { new TaskData( taskId = uiData.taskInfo.taskId, index = uiData.taskInfo.index, attempt = uiData.taskInfo.attemptNumber, launchTime = new Date(uiData.taskInfo.launchTime), - duration = uiData.taskDuration, + duration = uiData.taskDuration(lastUpdateTime), executorId = uiData.taskInfo.executorId, host = uiData.taskInfo.host, status = uiData.taskInfo.status, http://git-wip-us.apache.org/repos/asf/spark/blob/4b88393c/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala index 3e6d294..f15073b 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneStageResource.scala @@ -35,6 +35,7 @@ private[v1] class OneStageResource(ui: SparkUI) { def stageData(@PathParam("stageId") stageId: Int): Seq[StageData] = { withStage(stageId) { stageAttempts => stageAttempts.map { stage => + stage.ui.lastUpdateTime = ui.lastUpdateTime AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui, includeDetails = true) } @@ -47,6 +48,7 @@ private[v1] class OneStageResource(ui: SparkUI) { @PathParam("stageId") stageId: Int, @PathParam("stageAttemptId") stageAttemptId: Int): StageData = { withStageAttempt(stageId, stageAttemptId) { stage => + stage.ui.lastUpdateTime = ui.lastUpdateTime AllStagesResource.stageUiToStageData(stage.status, stage.info, stage.ui, includeDetails = true) } @@ -81,7 +83,8 @@ private[v1] class OneStageResource(ui: SparkUI) { @DefaultValue("20") @QueryParam("length") length: Int, @DefaultValue("ID") @QueryParam("sortBy") sortBy: TaskSorting): Seq[TaskData] = { withStageAttempt(stageId, stageAttemptId) { stage => - val tasks = stage.ui.taskData.values.map{AllStagesResource.convertTaskData}.toIndexedSeq + val tasks = stage.ui.taskData.values + .map{ AllStagesResource.convertTaskData(_, ui.lastUpdateTime)}.toIndexedSeq .sorted(OneStageResource.ordering(sortBy)) tasks.slice(offset, offset + length) } http://git-wip-us.apache.org/repos/asf/spark/blob/4b88393c/core/src/main/scala/org/apache/spark/ui/SparkUI.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 589f811..f3fcf27 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -50,6 +50,7 @@ private[spark] class SparkUI private ( val operationGraphListener: RDDOperationGraphListener, var appName: String, val basePath: String, + val lastUpdateTime: Option[Long] = None, val startTime: Long) extends WebUI(securityManager, securityManager.getSSLOptions("ui"), SparkUI.getUIPort(conf), conf, basePath, "SparkUI") @@ -176,9 +177,11 @@ private[spark] object SparkUI { securityManager: SecurityManager, appName: String, basePath: String, + lastUpdateTime: Option[Long], startTime: Long): SparkUI = { val sparkUI = create( - None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime) + None, conf, listenerBus, securityManager, appName, basePath, + lastUpdateTime = lastUpdateTime, startTime = startTime) val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory], Utils.getContextOrSparkClassLoader).asScala @@ -204,6 +207,7 @@ private[spark] object SparkUI { appName: String, basePath: String = "", jobProgressListener: Option[JobProgressListener] = None, + lastUpdateTime: Option[Long] = None, startTime: Long): SparkUI = { val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse { @@ -226,6 +230,6 @@ private[spark] object SparkUI { new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener, executorsListener, _jobProgressListener, storageListener, operationGraphListener, - appName, basePath, startTime) + appName, basePath, lastUpdateTime, startTime) } } http://git-wip-us.apache.org/repos/asf/spark/blob/4b88393c/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 633e740..4d80308 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -299,6 +299,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") { stageData.hasShuffleRead, stageData.hasShuffleWrite, stageData.hasBytesSpilled, + parent.lastUpdateTime, currentTime, pageSize = taskPageSize, sortColumn = taskSortColumn, @@ -863,6 +864,7 @@ private[ui] class TaskDataSource( hasShuffleRead: Boolean, hasShuffleWrite: Boolean, hasBytesSpilled: Boolean, + lastUpdateTime: Option[Long], currentTime: Long, pageSize: Int, sortColumn: String, @@ -889,8 +891,9 @@ private[ui] class TaskDataSource( private def taskRow(taskData: TaskUIData): TaskTableRowData = { val info = taskData.taskInfo val metrics = taskData.metrics - val duration = taskData.taskDuration.getOrElse(1L) - val formatDuration = taskData.taskDuration.map(d => UIUtils.formatDuration(d)).getOrElse("") + val duration = taskData.taskDuration(lastUpdateTime).getOrElse(1L) + val formatDuration = + taskData.taskDuration(lastUpdateTime).map(d => UIUtils.formatDuration(d)).getOrElse("") val schedulerDelay = metrics.map(getSchedulerDelay(info, _, currentTime)).getOrElse(0L) val gcTime = metrics.map(_.jvmGCTime).getOrElse(0L) val taskDeserializationTime = metrics.map(_.executorDeserializeTime).getOrElse(0L) @@ -1154,6 +1157,7 @@ private[ui] class TaskPagedTable( hasShuffleRead: Boolean, hasShuffleWrite: Boolean, hasBytesSpilled: Boolean, + lastUpdateTime: Option[Long], currentTime: Long, pageSize: Int, sortColumn: String, @@ -1179,6 +1183,7 @@ private[ui] class TaskPagedTable( hasShuffleRead, hasShuffleWrite, hasBytesSpilled, + lastUpdateTime, currentTime, pageSize, sortColumn, http://git-wip-us.apache.org/repos/asf/spark/blob/4b88393c/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala index 799d769..0787ea6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala @@ -30,6 +30,7 @@ private[ui] class StagesTab(parent: SparkUI) extends SparkUITab(parent, "stages" val progressListener = parent.jobProgressListener val operationGraphListener = parent.operationGraphListener val executorsListener = parent.executorsListener + val lastUpdateTime = parent.lastUpdateTime attachPage(new AllStagesPage(this)) attachPage(new StagePage(this)) http://git-wip-us.apache.org/repos/asf/spark/blob/4b88393c/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala index 9448baa..d9c87f6 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/UIData.scala @@ -97,6 +97,7 @@ private[spark] object UIData { var memoryBytesSpilled: Long = _ var diskBytesSpilled: Long = _ var isBlacklisted: Int = _ + var lastUpdateTime: Option[Long] = None var schedulingPool: String = "" var description: Option[String] = None @@ -133,9 +134,9 @@ private[spark] object UIData { _metrics = metrics.map(TaskMetricsUIData.fromTaskMetrics) } - def taskDuration: Option[Long] = { + def taskDuration(lastUpdateTime: Option[Long] = None): Option[Long] = { if (taskInfo.status == "RUNNING") { - Some(_taskInfo.timeRunning(System.currentTimeMillis)) + Some(_taskInfo.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis))) } else { _metrics.map(_.executorRunTime) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org