Repository: spark
Updated Branches:
  refs/heads/master 4d93b653f -> 0abee534f


[SPARK-14069][SQL] Improve SparkStatusTracker to also track executor information

## What changes were proposed in this pull request?

Track executor information like host and port, cache size, running tasks.

TODO: tests

## How was this patch tested?

N/A

Author: Wenchen Fan <wenc...@databricks.com>

Closes #11888 from cloud-fan/status-tracker.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0abee534
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0abee534
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0abee534

Branch: refs/heads/master
Commit: 0abee534f0ad9bbe84d8d3d3478ecaa594f1e0f4
Parents: 4d93b65
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Thu Mar 31 12:07:19 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Thu Mar 31 12:07:19 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/SparkExecutorInfo.java     | 33 ++++++++++++++++++++
 .../scala/org/apache/spark/SparkContext.scala   |  3 +-
 .../org/apache/spark/SparkStatusTracker.scala   | 20 ++++++++++++
 .../scala/org/apache/spark/StatusAPIImpl.scala  | 33 ++++++++++++--------
 .../spark/scheduler/TaskSchedulerImpl.scala     |  2 ++
 .../org/apache/spark/storage/StorageUtils.scala |  5 ++-
 6 files changed, 80 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0abee534/core/src/main/java/org/apache/spark/SparkExecutorInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/spark/SparkExecutorInfo.java 
b/core/src/main/java/org/apache/spark/SparkExecutorInfo.java
new file mode 100644
index 0000000..dc3e826
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/SparkExecutorInfo.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark;
+
+import java.io.Serializable;
+
+/**
+ * Exposes information about Spark Executors.
+ *
+ * This interface is not designed to be implemented outside of Spark.  We may 
add additional methods
+ * which may break binary compatibility with outside implementations.
+ */
+public interface SparkExecutorInfo extends Serializable {
+  String host();
+  int port();
+  long cacheSize();
+  int numRunningTasks();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/0abee534/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index dcb41f3..d7cb253 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -147,8 +147,7 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
       appName: String,
       sparkHome: String = null,
       jars: Seq[String] = Nil,
-      environment: Map[String, String] = Map()) =
-  {
+      environment: Map[String, String] = Map()) = {
     this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, 
jars, environment))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/0abee534/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala 
b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
index 34ee3a4..52c4656 100644
--- a/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
+++ b/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark
 
+import org.apache.spark.scheduler.TaskSchedulerImpl
+
 /**
  * Low-level status reporting APIs for monitoring job and stage progress.
  *
@@ -104,4 +106,22 @@ class SparkStatusTracker private[spark] (sc: SparkContext) 
{
       }
     }
   }
+
+  /**
+   * Returns information of all known executors, including host, port, 
cacheSize, numRunningTasks.
+   */
+  def getExecutorInfos: Array[SparkExecutorInfo] = {
+    val executorIdToRunningTasks: Map[String, Int] =
+      
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors()
+
+    sc.getExecutorStorageStatus.map { status =>
+      val bmId = status.blockManagerId
+      new SparkExecutorInfoImpl(
+        bmId.host,
+        bmId.port,
+        status.cacheSize,
+        executorIdToRunningTasks.getOrElse(bmId.executorId, 0)
+      )
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0abee534/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala 
b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
index e5c7c8d..c1f24a6 100644
--- a/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
+++ b/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala
@@ -18,18 +18,25 @@
 package org.apache.spark
 
 private class SparkJobInfoImpl (
-  val jobId: Int,
-  val stageIds: Array[Int],
-  val status: JobExecutionStatus)
- extends SparkJobInfo
+    val jobId: Int,
+    val stageIds: Array[Int],
+    val status: JobExecutionStatus)
+  extends SparkJobInfo
 
 private class SparkStageInfoImpl(
-  val stageId: Int,
-  val currentAttemptId: Int,
-  val submissionTime: Long,
-  val name: String,
-  val numTasks: Int,
-  val numActiveTasks: Int,
-  val numCompletedTasks: Int,
-  val numFailedTasks: Int)
- extends SparkStageInfo
+    val stageId: Int,
+    val currentAttemptId: Int,
+    val submissionTime: Long,
+    val name: String,
+    val numTasks: Int,
+    val numActiveTasks: Int,
+    val numCompletedTasks: Int,
+    val numFailedTasks: Int)
+  extends SparkStageInfo
+
+private class SparkExecutorInfoImpl(
+    val host: String,
+    val port: Int,
+    val cacheSize: Long,
+    val numRunningTasks: Int)
+  extends SparkExecutorInfo

http://git-wip-us.apache.org/repos/asf/spark/blob/0abee534/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index f7790fc..daed2ff 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -90,6 +90,8 @@ private[spark] class TaskSchedulerImpl(
   // Number of tasks running on each executor
   private val executorIdToTaskCount = new HashMap[String, Int]
 
+  def runningTasksByExecutors(): Map[String, Int] = executorIdToTaskCount.toMap
+
   // The set of executors we have on each host; this is used to compute 
hostsAlive, which
   // in turn is used to decide when we can attain data locality on a given host
   protected val executorsByHost = new HashMap[String, HashSet[String]]

http://git-wip-us.apache.org/repos/asf/spark/blob/0abee534/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala 
b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
index 199a5fc..fb9941b 100644
--- a/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
+++ b/core/src/main/scala/org/apache/spark/storage/StorageUtils.scala
@@ -175,7 +175,10 @@ class StorageStatus(val blockManagerId: BlockManagerId, 
val maxMem: Long) {
   def memRemaining: Long = maxMem - memUsed
 
   /** Return the memory used by this block manager. */
-  def memUsed: Long = _nonRddStorageInfo._1 + 
_rddBlocks.keys.toSeq.map(memUsedByRdd).sum
+  def memUsed: Long = _nonRddStorageInfo._1 + cacheSize
+
+  /** Return the memory used by caching RDDs */
+  def cacheSize: Long = _rddBlocks.keys.toSeq.map(memUsedByRdd).sum
 
   /** Return the disk space used by this block manager. */
   def diskUsed: Long = _nonRddStorageInfo._2 + 
_rddBlocks.keys.toSeq.map(diskUsedByRdd).sum


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to