Repository: spark Updated Branches: refs/heads/master 49720906c -> 2a0a8f753
[SPARK-23034][SQL] Show RDD/relation names in RDD/Hive table scan nodes ## What changes were proposed in this pull request? This pr proposed to show RDD/relation names in RDD/Hive table scan nodes. This change made these names show up in the webUI and explain results. For example; ``` scala> sql("CREATE TABLE t(c1 int) USING hive") scala> sql("INSERT INTO t VALUES(1)") scala> spark.table("t").explain() == Physical Plan == Scan hive default.t [c1#8], HiveTableRelation `default`.`t`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [c1#8] ^^^^^^^^^^^ ``` <img width="212" alt="spark-pr-hive" src="https://user-images.githubusercontent.com/692303/44501013-51264c80-a6c6-11e8-94f8-0704aee83bb6.png"> Closes #20226 ## How was this patch tested? Added tests in `DataFrameSuite`, `DatasetSuite`, and `HiveExplainSuite` Closes #22153 from maropu/pr20226. Lead-authored-by: Takeshi Yamamuro <yamam...@apache.org> Co-authored-by: Tejas Patil <tej...@fb.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a0a8f75 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a0a8f75 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a0a8f75 Branch: refs/heads/master Commit: 2a0a8f753bbdc8c251f8e699c0808f35b94cfd20 Parents: 4972090 Author: Takeshi Yamamuro <yamam...@apache.org> Authored: Thu Aug 23 14:26:10 2018 +0800 Committer: Wenchen Fan <wenc...@databricks.com> Committed: Thu Aug 23 14:26:10 2018 +0800 ---------------------------------------------------------------------- .../org/apache/spark/sql/kafka010/KafkaRelation.scala | 2 +- .../org/apache/spark/sql/kafka010/KafkaSource.scala | 4 ++-- .../scala/org/apache/spark/sql/SparkSession.scala | 6 +++--- .../org/apache/spark/sql/execution/ExistingRDD.scala | 14 +++++++++++--- .../spark/sql/execution/arrow/ArrowConverters.scala | 2 +- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 10 ++++++++++ .../scala/org/apache/spark/sql/DatasetSuite.scala | 10 ++++++++++ .../sql/streaming/FlatMapGroupsWithStateSuite.scala | 5 ++++- .../spark/sql/hive/execution/HiveTableScanExec.scala | 2 ++ .../spark/sql/hive/execution/HiveExplainSuite.scala | 12 ++++++++++++ 10 files changed, 56 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala index c31e6ed..9d856c9 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala @@ -117,7 +117,7 @@ private[kafka010] class KafkaRelation( DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(cr.timestamp)), cr.timestampType.id) } - sqlContext.internalCreateDataFrame(rdd, schema).rdd + sqlContext.internalCreateDataFrame(rdd.setName("kafka"), schema).rdd } private def getPartitionOffsets( http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 101e649..66ec7e0 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -215,7 +215,7 @@ private[kafka010] class KafkaSource( } if (start.isDefined && start.get == end) { return sqlContext.internalCreateDataFrame( - sqlContext.sparkContext.emptyRDD, schema, isStreaming = true) + sqlContext.sparkContext.emptyRDD[InternalRow].setName("empty"), schema, isStreaming = true) } val fromPartitionOffsets = start match { case Some(prevBatchEndOffset) => @@ -299,7 +299,7 @@ private[kafka010] class KafkaSource( logInfo("GetBatch generating RDD of offset range: " + offsetRanges.sortBy(_.topicPartition.toString).mkString(", ")) - sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = true) + sqlContext.internalCreateDataFrame(rdd.setName("kafka"), schema, isStreaming = true) } /** Stop this source and free any resources it has allocated. */ http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index d9278d8..2b847fb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -270,7 +270,7 @@ class SparkSession private( */ @transient lazy val emptyDataFrame: DataFrame = { - createDataFrame(sparkContext.emptyRDD[Row], StructType(Nil)) + createDataFrame(sparkContext.emptyRDD[Row].setName("empty"), StructType(Nil)) } /** @@ -395,7 +395,7 @@ class SparkSession private( // BeanInfo is not serializable so we must rediscover it remotely for each partition. SQLContext.beansToRows(iter, Utils.classForName(className), attributeSeq) } - Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRdd)(self)) + Dataset.ofRows(self, LogicalRDD(attributeSeq, rowRdd.setName(rdd.name))(self)) } /** @@ -594,7 +594,7 @@ class SparkSession private( } else { rowRDD.map { r: Row => InternalRow.fromSeq(r.toSeq) } } - internalCreateDataFrame(catalystRows, schema) + internalCreateDataFrame(catalystRows.setName(rowRDD.name), schema) } http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index be50a15..2962bec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -103,6 +103,10 @@ case class ExternalRDDScanExec[T]( override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) + private def rddName: String = Option(rdd.name).map(n => s" $n").getOrElse("") + + override val nodeName: String = s"Scan$rddName" + protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") val outputDataType = outputObjAttr.dataType @@ -116,7 +120,7 @@ case class ExternalRDDScanExec[T]( } override def simpleString: String = { - s"Scan $nodeName${output.mkString("[", ",", "]")}" + s"$nodeName${output.mkString("[", ",", "]")}" } } @@ -169,10 +173,14 @@ case class LogicalRDD( case class RDDScanExec( output: Seq[Attribute], rdd: RDD[InternalRow], - override val nodeName: String, + name: String, override val outputPartitioning: Partitioning = UnknownPartitioning(0), override val outputOrdering: Seq[SortOrder] = Nil) extends LeafExecNode { + private def rddName: String = Option(rdd.name).map(n => s" $n").getOrElse("") + + override val nodeName: String = s"Scan $name$rddName" + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) @@ -189,6 +197,6 @@ case class RDDScanExec( } override def simpleString: String = { - s"Scan $nodeName${Utils.truncatedString(output, "[", ",", "]")}" + s"$nodeName${Utils.truncatedString(output, "[", ",", "]")}" } } http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala index 501520c..6a5ac24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/arrow/ArrowConverters.scala @@ -211,6 +211,6 @@ private[sql] object ArrowConverters { ArrowConverters.fromPayloadIterator(iter.map(new ArrowPayload(_)), context) } val schema = DataType.fromJson(schemaString).asInstanceOf[StructType] - sqlContext.internalCreateDataFrame(rdd, schema) + sqlContext.internalCreateDataFrame(rdd.setName("arrow"), schema) } } http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 7310087..6f5c730 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -2552,4 +2552,14 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(numJobs == 1) } } + + test("SPARK-23034 show rdd names in RDD scan nodes") { + val rddWithName = spark.sparkContext.parallelize(Row(1, "abc") :: Nil).setName("testRdd") + val df2 = spark.createDataFrame(rddWithName, StructType.fromDDL("c0 int, c1 string")) + val output2 = new java.io.ByteArrayOutputStream() + Console.withOut(output2) { + df2.explain(extended = false) + } + assert(output2.toString.contains("Scan ExistingRDD testRdd")) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index cf24eba..6069f28 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1498,6 +1498,16 @@ class DatasetSuite extends QueryTest with SharedSQLContext { df.where($"city".contains(new java.lang.Character('A'))), Seq(Row("Amsterdam"))) } + + test("SPARK-23034 show rdd names in RDD scan nodes") { + val rddWithName = spark.sparkContext.parallelize(SingleData(1) :: Nil).setName("testRdd") + val df = spark.createDataFrame(rddWithName) + val output = new java.io.ByteArrayOutputStream() + Console.withOut(output) { + df.explain(extended = false) + } + assert(output.toString.contains("Scan testRdd")) + } } case class TestDataUnion(x: Int, y: Int, z: Int) http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala index 76511ae..e77ba1e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FlatMapGroupsWithStateSuite.scala @@ -27,6 +27,7 @@ import org.scalatest.exceptions.TestFailedException import org.apache.spark.SparkException import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction import org.apache.spark.sql.Encoder +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsWithState import org.apache.spark.sql.catalyst.plans.physical.UnknownPartitioning @@ -1229,6 +1230,7 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest timeoutType: GroupStateTimeout = GroupStateTimeout.NoTimeout, batchTimestampMs: Long = NO_TIMESTAMP): FlatMapGroupsWithStateExec = { val stateFormatVersion = spark.conf.get(SQLConf.FLATMAPGROUPSWITHSTATE_STATE_FORMAT_VERSION) + val emptyRdd = spark.sparkContext.emptyRDD[InternalRow] MemoryStream[Int] .toDS .groupByKey(x => x) @@ -1237,7 +1239,8 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest case FlatMapGroupsWithState(f, k, v, g, d, o, s, m, _, t, _) => FlatMapGroupsWithStateExec( f, k, v, g, d, o, None, s, stateFormatVersion, m, t, - Some(currentBatchTimestamp), Some(currentBatchWatermark), RDDScanExec(g, null, "rdd")) + Some(currentBatchTimestamp), Some(currentBatchWatermark), + RDDScanExec(g, emptyRdd, "rdd")) }.get } http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 6052486..b3795b4 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -62,6 +62,8 @@ case class HiveTableScanExec( override def conf: SQLConf = sparkSession.sessionState.conf + override def nodeName: String = s"Scan hive ${relation.tableMeta.qualifiedName}" + override lazy val metrics = Map( "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows")) http://git-wip-us.apache.org/repos/asf/spark/blob/2a0a8f75/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala index a1ce1ea..c349a32 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala @@ -170,4 +170,16 @@ class HiveExplainSuite extends QueryTest with SQLTestUtils with TestHiveSingleto sql("EXPLAIN EXTENDED CODEGEN SELECT 1") } } + + test("SPARK-23034 show relation names in Hive table scan nodes") { + val tableName = "tab" + withTable(tableName) { + sql(s"CREATE TABLE $tableName(c1 int) USING hive") + val output = new java.io.ByteArrayOutputStream() + Console.withOut(output) { + spark.table(tableName).explain(extended = false) + } + assert(output.toString.contains(s"Scan hive default.$tableName")) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org