This is an automated email from the ASF dual-hosted git repository. jiangxb1987 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 247bebc [SPARK-28561][WEBUI] DAG viz for barrier-execution mode 247bebc is described below commit 247bebcf94df77883ac245aea63e7e871fc7aa44 Author: Kousuke Saruta <saru...@oss.nttdata.com> AuthorDate: Mon Aug 12 22:38:10 2019 -0700 [SPARK-28561][WEBUI] DAG viz for barrier-execution mode ## What changes were proposed in this pull request? In the current UI, we cannot identify which RDDs are barrier. Visualizing it will make easy to debug. Following images are shown after this change. ![Screenshot from 2019-07-30 16-30-35](https://user-images.githubusercontent.com/4736016/62110508-83cec100-b2e9-11e9-83b9-bc2e485a4cbe.png) ![Screenshot from 2019-07-30 16-31-09](https://user-images.githubusercontent.com/4736016/62110509-83cec100-b2e9-11e9-9e2e-47c4dae23a52.png) The boxes in pale green mean barrier (We might need to discuss which color is proper). ## How was this patch tested? Tested manually. The images above are shown by following operations. ``` val rdd1 = sc.parallelize(1 to 10) val rdd2 = sc.parallelize(1 to 10) val rdd3 = rdd1.zip(rdd2).barrier.mapPartitions(identity(_)) val rdd4 = rdd3.map(identity(_)) val rdd5 = rdd4.reduceByKey(_+_) rdd5.collect ``` Closes #25296 from sarutak/barrierexec-dagviz. Authored-by: Kousuke Saruta <saru...@oss.nttdata.com> Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com> --- .../org/apache/spark/ui/static/spark-dag-viz.css | 12 +++++++++ .../org/apache/spark/ui/static/spark-dag-viz.js | 6 +++++ .../scala/org/apache/spark/status/storeTypes.scala | 4 ++- .../scala/org/apache/spark/storage/RDDInfo.scala | 3 ++- .../main/scala/org/apache/spark/ui/UIUtils.scala | 3 +++ .../apache/spark/ui/scope/RDDOperationGraph.scala | 30 +++++++++++++++++----- .../scala/org/apache/spark/util/JsonProtocol.scala | 4 ++- .../spark/status/AppStatusListenerSuite.scala | 6 ++--- .../org/apache/spark/storage/StorageSuite.scala | 4 +-- .../spark/ui/scope/RDDOperationGraphSuite.scala | 10 ++++---- .../org/apache/spark/util/JsonProtocolSuite.scala | 7 ++--- 11 files changed, 67 insertions(+), 22 deletions(-) diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css index 9cc5c79..1fbc90b 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.css @@ -89,6 +89,12 @@ stroke-width: 2px; } +#dag-viz-graph svg.job g.cluster.barrier rect { + fill: #B4E9E2; + stroke: #32DBC6; + stroke-width: 2px; +} + /* Stage page specific styles */ #dag-viz-graph svg.stage g.cluster rect { @@ -123,6 +129,12 @@ stroke-width: 2px; } +#dag-viz-graph svg.stage g.cluster.barrier rect { + fill: #84E9E2; + stroke: #32DBC6; + stroke-width: 2px; +} + .tooltip-inner { white-space: pre-wrap; } diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js index cf508ac..035d72f 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js @@ -172,6 +172,12 @@ function renderDagViz(forJob) { svg.selectAll("g." + nodeId).classed("cached", true); }); + metadataContainer().selectAll(".barrier-rdd").each(function() { + var rddId = d3.select(this).text().trim() + var clusterId = VizConstants.clusterPrefix + rddId + svg.selectAll("g." + clusterId).classed("barrier", true) + }); + resizeSvg(svg); interpretLineBreak(svg); } diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index eea47b3..9da5bea 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -402,7 +402,9 @@ private[spark] class RDDOperationClusterWrapper( val childClusters: Seq[RDDOperationClusterWrapper]) { def toRDDOperationCluster(): RDDOperationCluster = { - val cluster = new RDDOperationCluster(id, name) + val isBarrier = childNodes.exists(_.barrier) + val name = if (isBarrier) this.name + "\n(barrier mode)" else this.name + val cluster = new RDDOperationCluster(id, isBarrier, name) childNodes.foreach(cluster.attachChildNode) childClusters.foreach { child => cluster.attachChildCluster(child.toRDDOperationCluster()) diff --git a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala index 917cfab..27a4d4b 100644 --- a/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala +++ b/core/src/main/scala/org/apache/spark/storage/RDDInfo.scala @@ -29,6 +29,7 @@ class RDDInfo( var name: String, val numPartitions: Int, var storageLevel: StorageLevel, + val isBarrier: Boolean, val parentIds: Seq[Int], val callSite: String = "", val scope: Option[RDDOperationScope] = None) @@ -68,6 +69,6 @@ private[spark] object RDDInfo { rdd.creationSite.shortForm } new RDDInfo(rdd.id, rddName, rdd.partitions.length, - rdd.getStorageLevel, parentIds, callSite, rdd.scope) + rdd.getStorageLevel, rdd.isBarrier(), parentIds, callSite, rdd.scope) } } diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 11d3831..70e24bd 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -425,6 +425,9 @@ private[spark] object UIUtils extends Logging { { g.rootCluster.getCachedNodes.map { n => <div class="cached-rdd">{n.id}</div> + } ++ + g.rootCluster.getBarrierClusters.map { c => + <div class="barrier-rdd">{c.id}</div> } } </div> diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index 540c1c4..9ace324 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -42,7 +42,12 @@ private[spark] case class RDDOperationGraph( rootCluster: RDDOperationCluster) /** A node in an RDDOperationGraph. This represents an RDD. */ -private[spark] case class RDDOperationNode(id: Int, name: String, cached: Boolean, callsite: String) +private[spark] case class RDDOperationNode( + id: Int, + name: String, + cached: Boolean, + barrier: Boolean, + callsite: String) /** * A directed edge connecting two nodes in an RDDOperationGraph. @@ -56,7 +61,10 @@ private[spark] case class RDDOperationEdge(fromId: Int, toId: Int) * This represents any grouping of RDDs, including operation scopes (e.g. textFile, flatMap), * stages, jobs, or any higher level construct. A cluster may be nested inside of other clusters. */ -private[spark] class RDDOperationCluster(val id: String, private var _name: String) { +private[spark] class RDDOperationCluster( + val id: String, + val barrier: Boolean, + private var _name: String) { private val _childNodes = new ListBuffer[RDDOperationNode] private val _childClusters = new ListBuffer[RDDOperationCluster] @@ -75,6 +83,10 @@ private[spark] class RDDOperationCluster(val id: String, private var _name: Stri _childNodes.filter(_.cached) ++ _childClusters.flatMap(_.getCachedNodes) } + def getBarrierClusters: Seq[RDDOperationCluster] = { + _childClusters.filter(_.barrier) ++ _childClusters.flatMap(_.getBarrierClusters) + } + def canEqual(other: Any): Boolean = other.isInstanceOf[RDDOperationCluster] override def equals(other: Any): Boolean = other match { @@ -117,7 +129,7 @@ private[spark] object RDDOperationGraph extends Logging { val stageClusterId = STAGE_CLUSTER_PREFIX + stage.stageId val stageClusterName = s"Stage ${stage.stageId}" + { if (stage.attemptNumber == 0) "" else s" (attempt ${stage.attemptNumber})" } - val rootCluster = new RDDOperationCluster(stageClusterId, stageClusterName) + val rootCluster = new RDDOperationCluster(stageClusterId, false, stageClusterName) var rootNodeCount = 0 val addRDDIds = new mutable.HashSet[Int]() @@ -143,7 +155,7 @@ private[spark] object RDDOperationGraph extends Logging { // TODO: differentiate between the intention to cache an RDD and whether it's actually cached val node = nodes.getOrElseUpdate(rdd.id, RDDOperationNode( - rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE, rdd.callSite)) + rdd.id, rdd.name, rdd.storageLevel != StorageLevel.NONE, rdd.isBarrier, rdd.callSite)) if (rdd.scope.isEmpty) { // This RDD has no encompassing scope, so we put it directly in the root cluster // This should happen only if an RDD is instantiated outside of a public RDD API @@ -157,7 +169,8 @@ private[spark] object RDDOperationGraph extends Logging { val rddClusters = rddScopes.map { scope => val clusterId = scope.id val clusterName = scope.name.replaceAll("\\n", "\\\\n") - clusters.getOrElseUpdate(clusterId, new RDDOperationCluster(clusterId, clusterName)) + clusters.getOrElseUpdate( + clusterId, new RDDOperationCluster(clusterId, false, clusterName)) } // Build the cluster hierarchy for this RDD rddClusters.sliding(2).foreach { pc => @@ -227,7 +240,12 @@ private[spark] object RDDOperationGraph extends Logging { } else { "" } - val label = s"${node.name} [${node.id}]$isCached\n${node.callsite}" + val isBarrier = if (node.barrier) { + " [Barrier]" + } else { + "" + } + val label = s"${node.name} [${node.id}]$isCached$isBarrier\n${node.callsite}" s"""${node.id} [label="${StringEscapeUtils.escapeJava(label)}"]""" } diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index b8ca4ee..6b06975 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -1049,12 +1049,14 @@ private[spark] object JsonProtocol { .map { l => l.extract[List[JValue]].map(_.extract[Int]) } .getOrElse(Seq.empty) val storageLevel = storageLevelFromJson(json \ "Storage Level") + val isBarrier = jsonOption(json \ "Barrier").map(_.extract[Boolean]).getOrElse(false) val numPartitions = (json \ "Number of Partitions").extract[Int] val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int] val memSize = (json \ "Memory Size").extract[Long] val diskSize = (json \ "Disk Size").extract[Long] - val rddInfo = new RDDInfo(rddId, name, numPartitions, storageLevel, parentIds, callsite, scope) + val rddInfo = + new RDDInfo(rddId, name, numPartitions, storageLevel, isBarrier, parentIds, callsite, scope) rddInfo.numCachedPartitions = numCachedPartitions rddInfo.memSize = memSize rddInfo.diskSize = diskSize diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala index 7d73546..4b71a48 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala @@ -698,8 +698,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { val level = StorageLevel.MEMORY_AND_DISK // Submit a stage and make sure the RDDs are recorded. - val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, Nil) - val rdd2Info = new RDDInfo(rdd2b1.rddId, "rdd2", 1, level, Nil) + val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, false, Nil) + val rdd2Info = new RDDInfo(rdd2b1.rddId, "rdd2", 1, level, false, Nil) val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info, rdd2Info), Nil, "details1") listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) @@ -1543,7 +1543,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter { val level = StorageLevel.MEMORY_AND_DISK // Submit a stage and make sure the RDDs are recorded. - val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, Nil) + val rdd1Info = new RDDInfo(rdd1b1.rddId, "rdd1", 2, level, false, Nil) val stage = new StageInfo(1, 0, "stage1", 4, Seq(rdd1Info), Nil, "details1") listener.onStageSubmitted(SparkListenerStageSubmitted(stage, new Properties())) diff --git a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala index ca35238..5f2abb4 100644 --- a/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/StorageSuite.scala @@ -123,8 +123,8 @@ class StorageSuite extends SparkFunSuite { // For testing StorageUtils.updateRddInfo private def stockRDDInfos: Seq[RDDInfo] = { - val info0 = new RDDInfo(0, "0", 10, memAndDisk, Seq(3)) - val info1 = new RDDInfo(1, "1", 3, memAndDisk, Seq(4)) + val info0 = new RDDInfo(0, "0", 10, memAndDisk, false, Seq(3)) + val info1 = new RDDInfo(1, "1", 3, memAndDisk, false, Seq(4)) Seq(info0, info1) } diff --git a/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphSuite.scala b/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphSuite.scala index 6ddcb5a..e335451 100644 --- a/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/scope/RDDOperationGraphSuite.scala @@ -22,14 +22,14 @@ import org.apache.spark.SparkFunSuite class RDDOperationGraphSuite extends SparkFunSuite { test("Test simple cluster equals") { // create a 2-cluster chain with a child - val c1 = new RDDOperationCluster("1", "Bender") - val c2 = new RDDOperationCluster("2", "Hal") + val c1 = new RDDOperationCluster("1", false, "Bender") + val c2 = new RDDOperationCluster("2", false, "Hal") c1.attachChildCluster(c2) - c1.attachChildNode(new RDDOperationNode(3, "Marvin", false, "collect!")) + c1.attachChildNode(new RDDOperationNode(3, "Marvin", false, false, "collect!")) // create an equal cluster, but without the child node - val c1copy = new RDDOperationCluster("1", "Bender") - val c2copy = new RDDOperationCluster("2", "Hal") + val c1copy = new RDDOperationCluster("1", false, "Bender") + val c2copy = new RDDOperationCluster("2", false, "Hal") c1copy.attachChildCluster(c2copy) assert(c1 == c1copy) diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index a093fa6..bbf64be 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -366,14 +366,14 @@ class JsonProtocolSuite extends SparkFunSuite { test("RDDInfo backward compatibility (scope, parent IDs, callsite)") { // "Scope" and "Parent IDs" were introduced in Spark 1.4.0 // "Callsite" was introduced in Spark 1.6.0 - val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, Seq(1, 6, 8), + val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, false, Seq(1, 6, 8), "callsite", Some(new RDDOperationScope("fable"))) val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo) .removeField({ _._1 == "Parent IDs"}) .removeField({ _._1 == "Scope"}) .removeField({ _._1 == "Callsite"}) val expectedRddInfo = new RDDInfo( - 1, "one", 100, StorageLevel.NONE, Seq.empty, "", scope = None) + 1, "one", 100, StorageLevel.NONE, false, Seq.empty, "", scope = None) assertEquals(expectedRddInfo, JsonProtocol.rddInfoFromJson(oldRddInfoJson)) } @@ -857,7 +857,8 @@ private[spark] object JsonProtocolSuite extends Assertions { } private def makeRddInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = { - val r = new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK, Seq(1, 4, 7), a.toString) + val r = + new RDDInfo(a, "mayor", b, StorageLevel.MEMORY_AND_DISK, false, Seq(1, 4, 7), a.toString) r.numCachedPartitions = c r.memSize = d r.diskSize = e --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org