GitHub user cloud-fan opened a pull request:
https://github.com/apache/spark/pull/20647
[SPARK-23303][SQL] improve the explain result for data source v2 relations
## What changes were proposed in this pull request?
The current explain result for data source v2 relation is unreadable:
```
== Parsed Logical Plan ==
'Filter ('i > 6)
+- AnalysisBarrier
+- Project [j#1]
+- DataSourceV2Relation [i#0, j#1],
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader@3b415940
== Analyzed Logical Plan ==
j: int
Project [j#1]
+- Filter (i#0 > 6)
+- Project [j#1, i#0]
+- DataSourceV2Relation [i#0, j#1],
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader@3b415940
== Optimized Logical Plan ==
Project [j#1]
+- Filter isnotnull(i#0)
+- DataSourceV2Relation [i#0, j#1],
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader@3b415940
== Physical Plan ==
*(1) Project [j#1]
+- *(1) Filter isnotnull(i#0)
+- *(1) DataSourceV2Scan [i#0, j#1],
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader@3b415940
```
after this PR
```
== Parsed Logical Plan ==
'Project [unresolvedalias('j, None)]
+- AnalysisBarrier
+- Relation AdvancedDataSourceV2[i#0, j#1]
== Analyzed Logical Plan ==
j: int
Project [j#1]
+- Relation AdvancedDataSourceV2[i#0, j#1]
== Optimized Logical Plan ==
Relation AdvancedDataSourceV2[j#1]
== Physical Plan ==
*(1) Scan AdvancedDataSourceV2[j#1]
```
-------
```
== Analyzed Logical Plan ==
i: int, j: int
Filter (i#88 > 3)
+- Relation JavaAdvancedDataSourceV2[i#88, j#89]
== Optimized Logical Plan ==
Filter isnotnull(i#88)
+- Relation JavaAdvancedDataSourceV2[i#88, j#89] (PushedFilter:
[GreaterThan(i,3)])
== Physical Plan ==
*(1) Filter isnotnull(i#88)
+- *(1) Scan JavaAdvancedDataSourceV2[i#88, j#89] (PushedFilter:
[GreaterThan(i,3)])
```
an example for streaming query
```
== Parsed Logical Plan ==
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
java.lang.String, true], true, false) AS value#6]
+- MapElements <function1>, class java.lang.String,
[StructField(value,StringType,true)], obj#5: java.lang.String
+- DeserializeToObject cast(value#25 as string).toString, obj#4:
java.lang.String
+- Streaming Relation FakeDataSourceV2$[value#25]
== Analyzed Logical Plan ==
value: string, count(1): bigint
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
java.lang.String, true], true, false) AS value#6]
+- MapElements <function1>, class java.lang.String,
[StructField(value,StringType,true)], obj#5: java.lang.String
+- DeserializeToObject cast(value#25 as string).toString, obj#4:
java.lang.String
+- Streaming Relation FakeDataSourceV2$[value#25]
== Optimized Logical Plan ==
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
java.lang.String, true], true, false) AS value#6]
+- MapElements <function1>, class java.lang.String,
[StructField(value,StringType,true)], obj#5: java.lang.String
+- DeserializeToObject value#25.toString, obj#4: java.lang.String
+- Streaming Relation FakeDataSourceV2$[value#25]
== Physical Plan ==
*(4) HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6,
count(1)#11L])
+- StateStoreSave [value#6], state info [ checkpoint =
*********(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state,
runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions
= 5], Complete, 0
+- *(3) HashAggregate(keys=[value#6], functions=[merge_count(1)],
output=[value#6, count#16L])
+- StateStoreRestore [value#6], state info [ checkpoint =
*********(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state,
runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions
= 5]
+- *(2) HashAggregate(keys=[value#6], functions=[merge_count(1)],
output=[value#6, count#16L])
+- Exchange hashpartitioning(value#6, 5)
+- *(1) HashAggregate(keys=[value#6],
functions=[partial_count(1)], output=[value#6, count#16L])
+- *(1) SerializeFromObject [staticinvoke(class
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0,
java.lang.String, true], true, false) AS value#6]
+- *(1) MapElements <function1>, obj#5:
java.lang.String
+- *(1) DeserializeToObject value#25.toString,
obj#4: java.lang.String
+- *(1) Scan FakeDataSourceV2$[value#25]
```
## How was this patch tested?
N/A
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/cloud-fan/spark explain
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/20647.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #20647
----
commit 138679ee8b713c20ac89d44a2e8e5d877772c69a
Author: Wenchen Fan <wenchen@...>
Date: 2018-02-13T05:12:22Z
improve data source v2 explain
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]