Repository: spark
Updated Branches:
  refs/heads/master 49720906c -> 2a0a8f753


[SPARK-23034][SQL] Show RDD/relation names in RDD/Hive table scan nodes

## What changes were proposed in this pull request?
This pr proposed to show RDD/relation names in RDD/Hive table scan nodes.
This change made these names show up in the webUI and explain results.
For example;
```
scala> sql("CREATE TABLE t(c1 int) USING hive")
scala> sql("INSERT INTO t VALUES(1)")
scala> spark.table("t").explain()
== Physical Plan ==
Scan hive default.t [c1#8], HiveTableRelation `default`.`t`, 
org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#8]
         ^^^^^^^^^^^
```
<img width="212" alt="spark-pr-hive" 
src="https://user-images.githubusercontent.com/692303/44501013-51264c80-a6c6-11e8-94f8-0704aee83bb6.png";>

Closes #20226

## How was this patch tested?
Added tests in `DataFrameSuite`, `DatasetSuite`, and `HiveExplainSuite`

Closes #22153 from maropu/pr20226.

Lead-authored-by: Takeshi Yamamuro <yamam...@apache.org>
Co-authored-by: Tejas Patil <tej...@fb.com>
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a0a8f75
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a0a8f75
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a0a8f75

Branch: refs/heads/master
Commit: 2a0a8f753bbdc8c251f8e699c0808f35b94cfd20
Parents: 4972090
Author: Takeshi Yamamuro <yamam...@apache.org>
Authored: Thu Aug 23 14:26:10 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Aug 23 14:26:10 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/kafka010/KafkaRelation.scala |  2 +-
 .../org/apache/spark/sql/kafka010/KafkaSource.scala   |  4 ++--
 .../scala/org/apache/spark/sql/SparkSession.scala     |  6 +++---
 .../org/apache/spark/sql/execution/ExistingRDD.scala  | 14 +++++++++++---
 .../spark/sql/execution/arrow/ArrowConverters.scala   |  2 +-
 .../scala/org/apache/spark/sql/DataFrameSuite.scala   | 10 ++++++++++
 .../scala/org/apache/spark/sql/DatasetSuite.scala     | 10 ++++++++++
 .../sql/streaming/FlatMapGroupsWithStateSuite.scala   |  5 ++++-
 .../spark/sql/hive/execution/HiveTableScanExec.scala  |  2 ++
 .../spark/sql/hive/execution/HiveExplainSuite.scala   | 12 ++++++++++++
 10 files changed, 56 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
index c31e6ed..9d856c9 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
@@ -117,7 +117,7 @@ private[kafka010] class KafkaRelation(
         DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)),
         cr.timestampType.id)
     }
-    sqlContext.internalCreateDataFrame(rdd, schema).rdd
+    sqlContext.internalCreateDataFrame(rdd.setName("kafka"), schema).rdd
   }
 
   private def getPartitionOffsets(

http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 101e649..66ec7e0 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -215,7 +215,7 @@ private[kafka010] class KafkaSource(
     }
     if (start.isDefined && start.get == end) {
       return sqlContext.internalCreateDataFrame(
-        sqlContext.sparkContext.emptyRDD, schema, isStreaming = true)
+        sqlContext.sparkContext.emptyRDD[InternalRow].setName("empty"), 
schema, isStreaming = true)
     }
     val fromPartitionOffsets = start match {
       case Some(prevBatchEndOffset) =>
@@ -299,7 +299,7 @@ private[kafka010] class KafkaSource(
     logInfo("GetBatch generating RDD of offset range: " +
       offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))
 
-    sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true)
+    sqlContext.internalCreateDataFrame(rdd.setName("kafka"), schema, 
isStreaming = true)
   }
 
   /** Stop this source and free any resources it has allocated. */

http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index d9278d8..2b847fb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -270,7 +270,7 @@ class SparkSession private(
    */
   @transient
   lazy val emptyDataFrame: DataFrame = {
-    createDataFrame(sparkContext.emptyRDD[Row], StructType(Nil))
+    createDataFrame(sparkContext.emptyRDD[Row].setName("empty"), 
StructType(Nil))
   }
 
   /**
@@ -395,7 +395,7 @@ class SparkSession private(
     // BeanInfo is not serializable so we must rediscover it remotely for each 
partition.
       SQLContext.beansToRows(iter, Utils.classForName(className), attributeSeq)
     }
-    Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRdd)(self))
+    Dataset.ofRows(self, LogicalRDD(attributeSeq, 
rowRdd.setName(rdd.name))(self))
   }
 
   /**
@@ -594,7 +594,7 @@ class SparkSession private(
     } else {
       rowRDD.map { r: Row => InternalRow.fromSeq(r.toSeq) }
     }
-    internalCreateDataFrame(catalystRows, schema)
+    internalCreateDataFrame(catalystRows.setName(rowRDD.name), schema)
   }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index be50a15..2962bec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -103,6 +103,10 @@ case class ExternalRDDScanExec[T](
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
 
+  private def rddName: String = Option(rdd.name).map(n => s" $n").getOrElse("")
+
+  override val nodeName: String = s"Scan$rddName"
+
   protected override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")
     val outputDataType = outputObjAttr.dataType
@@ -116,7 +120,7 @@ case class ExternalRDDScanExec[T](
   }
 
   override def simpleString: String = {
-    s"Scan $nodeName${output.mkString("[", ",", "]")}"
+    s"$nodeName${output.mkString("[", ",", "]")}"
   }
 }
 
@@ -169,10 +173,14 @@ case class LogicalRDD(
 case class RDDScanExec(
     output: Seq[Attribute],
     rdd: RDD[InternalRow],
-    override val nodeName: String,
+    name: String,
     override val outputPartitioning: Partitioning = UnknownPartitioning(0),
     override val outputOrdering: Seq[SortOrder] = Nil) extends LeafExecNode {
 
+  private def rddName: String = Option(rdd.name).map(n => s" $n").getOrElse("")
+
+  override val nodeName: String = s"Scan $name$rddName"
+
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
 
@@ -189,6 +197,6 @@ case class RDDScanExec(
   }
 
   override def simpleString: String = {
-    s"Scan $nodeName${Utils.truncatedString(output, "[", ",", "]")}"
+    s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}"
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
index 501520c..6a5ac24 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala
@@ -211,6 +211,6 @@ private[sql] object ArrowConverters {
       ArrowConverters.fromPayloadIterator(iter.map(new ArrowPayload(_)), 
context)
     }
     val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
-    sqlContext.internalCreateDataFrame(rdd, schema)
+    sqlContext.internalCreateDataFrame(rdd.setName("arrow"), schema)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 7310087..6f5c730 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -2552,4 +2552,14 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
       assert(numJobs == 1)
     }
   }
+
+  test("SPARK-23034 show rdd names in RDD scan nodes") {
+    val rddWithName = spark.sparkContext.parallelize(Row(1, "abc") :: 
Nil).setName("testRdd")
+    val df2 = spark.createDataFrame(rddWithName, StructType.fromDDL("c0 int, 
c1 string"))
+    val output2 = new java.io.ByteArrayOutputStream()
+    Console.withOut(output2) {
+      df2.explain(extended = false)
+    }
+    assert(output2.toString.contains("Scan ExistingRDD testRdd"))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index cf24eba..6069f28 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -1498,6 +1498,16 @@ class DatasetSuite extends QueryTest with 
SharedSQLContext {
       df.where($"city".contains(new java.lang.Character('A'))),
       Seq(Row("Amsterdam")))
   }
+
+  test("SPARK-23034 show rdd names in RDD scan nodes") {
+    val rddWithName = spark.sparkContext.parallelize(SingleData(1) :: 
Nil).setName("testRdd")
+    val df = spark.createDataFrame(rddWithName)
+    val output = new java.io.ByteArrayOutputStream()
+    Console.withOut(output) {
+      df.explain(extended = false)
+    }
+    assert(output.toString.contains("Scan testRdd"))
+  }
 }
 
 case class TestDataUnion(x: Int, y: Int, z: Int)

http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
index 76511ae..e77ba1e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala
@@ -27,6 +27,7 @@ import org.scalatest.exceptions.TestFailedException
 import org.apache.spark.SparkException
 import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction
 import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, 
UnsafeProjection, UnsafeRow}
 import org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsWithState
 import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning
@@ -1229,6 +1230,7 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest
       timeoutType: GroupStateTimeout = GroupStateTimeout.NoTimeout,
       batchTimestampMs: Long = NO_TIMESTAMP): FlatMapGroupsWithStateExec = {
     val stateFormatVersion = 
spark.conf.get(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION)
+    val emptyRdd = spark.sparkContext.emptyRDD[InternalRow]
     MemoryStream[Int]
       .toDS
       .groupByKey(x => x)
@@ -1237,7 +1239,8 @@ class FlatMapGroupsWithStateSuite extends 
StateStoreMetricsTest
         case FlatMapGroupsWithState(f, k, v, g, d, o, s, m, _, t, _) =>
           FlatMapGroupsWithStateExec(
             f, k, v, g, d, o, None, s, stateFormatVersion, m, t,
-            Some(currentBatchTimestamp), Some(currentBatchWatermark), 
RDDScanExec(g, null, "rdd"))
+            Some(currentBatchTimestamp), Some(currentBatchWatermark),
+            RDDScanExec(g, emptyRdd, "rdd"))
       }.get
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index 6052486..b3795b4 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -62,6 +62,8 @@ case class HiveTableScanExec(
 
   override def conf: SQLConf = sparkSession.sessionState.conf
 
+  override def nodeName: String = s"Scan hive 
${relation.tableMeta.qualifiedName}"
+
   override lazy val metrics = Map(
     "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
index a1ce1ea..c349a32 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
@@ -170,4 +170,16 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils 
with TestHiveSingleto
       sql("EXPLAIN EXTENDED CODEGEN SELECT 1")
     }
   }
+
+  test("SPARK-23034 show relation names in Hive table scan nodes") {
+    val tableName = "tab"
+    withTable(tableName) {
+      sql(s"CREATE TABLE $tableName(c1 int) USING hive")
+      val output = new java.io.ByteArrayOutputStream()
+      Console.withOut(output) {
+        spark.table(tableName).explain(extended = false)
+      }
+      assert(output.toString.contains(s"Scan hive default.$tableName"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to