spark git commit: [SPARK-23303][SQL] improve the explain result for data source v2 relations

2018-03-05 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 8c5b34c42 -> ad640a5af


[SPARK-23303][SQL] improve the explain result for data source v2 relations

## What changes were proposed in this pull request?

The proposed explain format:
**[streaming header] [RelationV2/ScanV2] [data source name] [output] [pushed 
filters] [options]**

**streaming header**: if it's a streaming relation, put a "Streaming" at the 
beginning.
**RelationV2/ScanV2**: if it's a logical plan, put a "RelationV2", else, put a 
"ScanV2"
**data source name**: the simple class name of the data source implementation
**output**: a string of the plan output attributes
**pushed filters**: a string of all the filters that have been pushed to this 
data source
**options**: all the options to create the data source reader.

The current explain result for data source v2 relation is unreadable:
```
== Parsed Logical Plan ==
'Filter ('i > 6)
+- AnalysisBarrier
  +- Project [j#1]
 +- DataSourceV2Relation [i#0, j#1], 
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940

== Analyzed Logical Plan ==
j: int
Project [j#1]
+- Filter (i#0 > 6)
   +- Project [j#1, i#0]
  +- DataSourceV2Relation [i#0, j#1], 
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940

== Optimized Logical Plan ==
Project [j#1]
+- Filter isnotnull(i#0)
   +- DataSourceV2Relation [i#0, j#1], 
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940

== Physical Plan ==
*(1) Project [j#1]
+- *(1) Filter isnotnull(i#0)
   +- *(1) DataSourceV2Scan [i#0, j#1], 
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940
```

after this PR
```
== Parsed Logical Plan ==
'Project [unresolvedalias('j, None)]
+- AnalysisBarrier
  +- RelationV2 AdvancedDataSourceV2[i#0, j#1]

== Analyzed Logical Plan ==
j: int
Project [j#1]
+- RelationV2 AdvancedDataSourceV2[i#0, j#1]

== Optimized Logical Plan ==
RelationV2 AdvancedDataSourceV2[j#1]

== Physical Plan ==
*(1) ScanV2 AdvancedDataSourceV2[j#1]
```
---
```
== Analyzed Logical Plan ==
i: int, j: int
Filter (i#88 > 3)
+- RelationV2 JavaAdvancedDataSourceV2[i#88, j#89]

== Optimized Logical Plan ==
Filter isnotnull(i#88)
+- RelationV2 JavaAdvancedDataSourceV2[i#88, j#89] (Pushed Filters: 
[GreaterThan(i,3)])

== Physical Plan ==
*(1) Filter isnotnull(i#88)
+- *(1) ScanV2 JavaAdvancedDataSourceV2[i#88, j#89] (Pushed Filters: 
[GreaterThan(i,3)])
```

an example for streaming query
```
== Parsed Logical Plan ==
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true, false) AS value#6]
   +- MapElements , class java.lang.String, 
[StructField(value,StringType,true)], obj#5: java.lang.String
  +- DeserializeToObject cast(value#25 as string).toString, obj#4: 
java.lang.String
 +- Streaming RelationV2 MemoryStreamDataSource[value#25]

== Analyzed Logical Plan ==
value: string, count(1): bigint
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true, false) AS value#6]
   +- MapElements , class java.lang.String, 
[StructField(value,StringType,true)], obj#5: java.lang.String
  +- DeserializeToObject cast(value#25 as string).toString, obj#4: 
java.lang.String
 +- Streaming RelationV2 MemoryStreamDataSource[value#25]

== Optimized Logical Plan ==
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true, false) AS value#6]
   +- MapElements , class java.lang.String, 
[StructField(value,StringType,true)], obj#5: java.lang.String
  +- DeserializeToObject value#25.toString, obj#4: java.lang.String
 +- Streaming RelationV2 MemoryStreamDataSource[value#25]

== Physical Plan ==
*(4) HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6, 
count(1)#11L])
+- StateStoreSave [value#6], state info [ checkpoint = 
*(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state,
 runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions 
= 5], Complete, 0
   +- *(3) HashAggregate(keys=[value#6], functions=[merge_count(1)], 
output=[value#6, count#16L])
  +- StateStoreRestore [value#6], state info [ checkpoint = 
*(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state,
 runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions 
= 5]
 +- *(2) HashAggregate(keys=[value#6], functions=[merge_count(1)], 
output=[value#6, count#16L])
+- Exchange hashpartitioning(value#6, 5)
   +- *(1) HashAggregate(keys=[value#6], 
functions=[partial_count(1)], 

spark git commit: [SPARK-23303][SQL] improve the explain result for data source v2 relations

2018-02-12 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master ed4e78bd6 -> f17b936f0


[SPARK-23303][SQL] improve the explain result for data source v2 relations

## What changes were proposed in this pull request?

The current explain result for data source v2 relation is unreadable:
```
== Parsed Logical Plan ==
'Filter ('i > 6)
+- AnalysisBarrier
  +- Project [j#1]
 +- DataSourceV2Relation [i#0, j#1], 
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940

== Analyzed Logical Plan ==
j: int
Project [j#1]
+- Filter (i#0 > 6)
   +- Project [j#1, i#0]
  +- DataSourceV2Relation [i#0, j#1], 
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940

== Optimized Logical Plan ==
Project [j#1]
+- Filter isnotnull(i#0)
   +- DataSourceV2Relation [i#0, j#1], 
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940

== Physical Plan ==
*(1) Project [j#1]
+- *(1) Filter isnotnull(i#0)
   +- *(1) DataSourceV2Scan [i#0, j#1], 
org.apache.spark.sql.sources.v2.AdvancedDataSourceV2$Reader3b415940
```

after this PR
```
== Parsed Logical Plan ==
'Project [unresolvedalias('j, None)]
+- AnalysisBarrier
  +- Relation AdvancedDataSourceV2[i#0, j#1]

== Analyzed Logical Plan ==
j: int
Project [j#1]
+- Relation AdvancedDataSourceV2[i#0, j#1]

== Optimized Logical Plan ==
Relation AdvancedDataSourceV2[j#1]

== Physical Plan ==
*(1) Scan AdvancedDataSourceV2[j#1]
```
---
```
== Analyzed Logical Plan ==
i: int, j: int
Filter (i#88 > 3)
+- Relation JavaAdvancedDataSourceV2[i#88, j#89]

== Optimized Logical Plan ==
Filter isnotnull(i#88)
+- Relation JavaAdvancedDataSourceV2[i#88, j#89] (PushedFilter: 
[GreaterThan(i,3)])

== Physical Plan ==
*(1) Filter isnotnull(i#88)
+- *(1) Scan JavaAdvancedDataSourceV2[i#88, j#89] (PushedFilter: 
[GreaterThan(i,3)])
```

an example for streaming query
```
== Parsed Logical Plan ==
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true, false) AS value#6]
   +- MapElements , class java.lang.String, 
[StructField(value,StringType,true)], obj#5: java.lang.String
  +- DeserializeToObject cast(value#25 as string).toString, obj#4: 
java.lang.String
 +- Streaming Relation FakeDataSourceV2$[value#25]

== Analyzed Logical Plan ==
value: string, count(1): bigint
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true, false) AS value#6]
   +- MapElements , class java.lang.String, 
[StructField(value,StringType,true)], obj#5: java.lang.String
  +- DeserializeToObject cast(value#25 as string).toString, obj#4: 
java.lang.String
 +- Streaming Relation FakeDataSourceV2$[value#25]

== Optimized Logical Plan ==
Aggregate [value#6], [value#6, count(1) AS count(1)#11L]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true, false) AS value#6]
   +- MapElements , class java.lang.String, 
[StructField(value,StringType,true)], obj#5: java.lang.String
  +- DeserializeToObject value#25.toString, obj#4: java.lang.String
 +- Streaming Relation FakeDataSourceV2$[value#25]

== Physical Plan ==
*(4) HashAggregate(keys=[value#6], functions=[count(1)], output=[value#6, 
count(1)#11L])
+- StateStoreSave [value#6], state info [ checkpoint = 
*(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state,
 runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions 
= 5], Complete, 0
   +- *(3) HashAggregate(keys=[value#6], functions=[merge_count(1)], 
output=[value#6, count#16L])
  +- StateStoreRestore [value#6], state info [ checkpoint = 
*(redacted)/cloud/dev/spark/target/tmp/temporary-549f264b-2531-4fcb-a52f-433c77347c12/state,
 runId = f84d9da9-2f8c-45c1-9ea1-70791be684de, opId = 0, ver = 0, numPartitions 
= 5]
 +- *(2) HashAggregate(keys=[value#6], functions=[merge_count(1)], 
output=[value#6, count#16L])
+- Exchange hashpartitioning(value#6, 5)
   +- *(1) HashAggregate(keys=[value#6], 
functions=[partial_count(1)], output=[value#6, count#16L])
  +- *(1) SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, 
java.lang.String, true], true, false) AS value#6]
 +- *(1) MapElements , obj#5: java.lang.String
+- *(1) DeserializeToObject value#25.toString, obj#4: 
java.lang.String
   +- *(1) Scan FakeDataSourceV2$[value#25]
```
## How was this patch tested?

N/A

Author: Wenchen Fan 

Closes #20477 from cloud-fan/explain.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo