Github user ajbozarth commented on a diff in the pull request:
https://github.com/apache/spark/pull/19750#discussion_r151281491
--- Diff: core/src/main/scala/org/apache/spark/SparkStatusTracker.scala ---
@@ -89,39 +83,33 @@ class SparkStatusTracker private[spark] (sc:
SparkContext) {
* garbage collected.
*/
def getStageInfo(stageId: Int): Option[SparkStageInfo] = {
- jobProgressListener.synchronized {
- for (
- info <- jobProgressListener.stageIdToInfo.get(stageId);
- data <- jobProgressListener.stageIdToData.get((stageId,
info.attemptId))
- ) yield {
- new SparkStageInfoImpl(
- stageId,
- info.attemptId,
- info.submissionTime.getOrElse(0),
- info.name,
- info.numTasks,
- data.numActiveTasks,
- data.numCompleteTasks,
- data.numFailedTasks)
- }
+ store.asOption(store.lastStageAttempt(stageId)).map { stage =>
+ new SparkStageInfoImpl(
+ stageId,
+ stage.attemptId,
+ stage.submissionTime.map(_.getTime()).getOrElse(0L),
+ stage.name,
+ stage.numTasks,
+ stage.numActiveTasks,
+ stage.numCompleteTasks,
+ stage.numFailedTasks)
}
}
/**
* Returns information of all known executors, including host, port,
cacheSize, numRunningTasks.
*/
def getExecutorInfos: Array[SparkExecutorInfo] = {
- val executorIdToRunningTasks: Map[String, Int] =
-
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors
-
- sc.getExecutorStorageStatus.map { status =>
- val bmId = status.blockManagerId
+ store.executorList(true).map { exec =>
+ val (host, port) = exec.hostPort.split(":", 2) match {
+ case Array(h, p) => (h, p.toInt)
+ case Array(h) => (h, -1)
+ }
new SparkExecutorInfoImpl(
- bmId.host,
- bmId.port,
- status.cacheSize,
- executorIdToRunningTasks.getOrElse(bmId.executorId, 0)
- )
- }
+ host,
+ port,
+ exec.maxMemory,
--- End diff --
Why this change from cacheSize to maxMemory, are they synonymous?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]