[GitHub] spark pull request #15605: [WIP] [SPARK-18067] [SQL] SortMergeJoin adds shuf...

2017-08-25 Thread tejasapatil
Github user tejasapatil closed the pull request at:

https://github.com/apache/spark/pull/15605


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15605: [WIP] [SPARK-18067] [SQL] SortMergeJoin adds shuf...

2016-10-24 Thread tejasapatil
GitHub user tejasapatil reopened a pull request:

https://github.com/apache/spark/pull/15605

[WIP] [SPARK-18067] [SQL] SortMergeJoin adds shuffle if join predicates 
have non partitioned columns

## What changes were proposed in this pull request?

See https://issues.apache.org/jira/browse/SPARK-18067 for discussion. 
Putting out a PR to get some feedback about the approach.

Assume that there are two tables with columns `key` and `value` both hash 
partitioned over `key`. Assume these are the partitions for the children:

partitions |child 1 | child 2
- | -- | 
partition 0 | [0, 0, 0, 3] | [0, 0, 3, 3]
partition 1 | [1, 4, 4] | [4]
partition 2 | [2, 2] | [2, 5, 5, 5]

Since we have _all_ the same values of `key` in a given partition, we can 
evaluate other join predicates like (`tableA.value` = `tableB.value`) right 
there without needing any shuffle.

What is previously being done i.e. `HashPartitioning(key, value)` expects  
over rows with same value of `pmod( hash(key, value))` to be in the same 
partition and does not take advantage of the fact that we already have rows 
with same `key` packed together. 

This PR uses `PartitioningCollection` instead of `HashPartitioning` for 
expected partitioning.

Query:

```
val df = (0 until 16).map(i => (i, i * 2)).toDF("i", "j").coalesce(1)
df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, 
"i").sortBy("i").saveAsTable("tableA")
df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, 
"i").sortBy("i").saveAsTable("tableB")

hc.sql("SELECT * FROM tableA a JOIN tableB b ON a.i=b.i AND 
a.j=b.j").explain(true)
```

Before:

```
*SortMergeJoin [i#38, j#39], [i#40, j#41], Inner
:- *Sort [i#38 ASC NULLS FIRST, j#39 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i#38, j#39, 200)
: +- *Project [i#38, j#39]
:+- *Filter (isnotnull(i#38) && isnotnull(j#39))
:   +- *FileScan orc default.tablea[i#38,j#39] Batched: false, 
Format: ORC, Location: 
ListingFileCatalog[file:/Users/tejasp/Desktop/dev/tp-spark-2/spark/spark-warehouse/tablea],
 PartitionFilters: [], PushedFilters: [IsNotNull(i), IsNotNull(j)], ReadSchema: 
struct
+- *Sort [i#40 ASC NULLS FIRST, j#41 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i#40, j#41, 200)
  +- *Project [i#40, j#41]
 +- *Filter (isnotnull(i#40) && isnotnull(j#41))
+- *FileScan orc default.tableb[i#40,j#41] Batched: false, 
Format: ORC, Location: 
ListingFileCatalog[file:/Users/tejasp/Desktop/dev/tp-spark-2/spark/spark-warehouse/tableb],
 PartitionFilters: [], PushedFilters: [IsNotNull(i), IsNotNull(j)], ReadSchema: 
struct
```

After:

```
== Physical Plan ==
*SortMergeJoin [i#38, j#39], [i#40, j#41], Inner
:- *Sort [i#38 ASC NULLS FIRST, j#39 ASC NULLS FIRST], false, 0
:  +- *Project [i#38, j#39]
: +- *Filter (isnotnull(j#39) && isnotnull(i#38))
:+- *FileScan orc default.tablea[i#38,j#39] Batched: false, Format: 
ORC, Location: 
ListingFileCatalog[file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/tablea],
 PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(i)], ReadSchema: 
struct
+- *Sort [i#40 ASC NULLS FIRST, j#41 ASC NULLS FIRST], false, 0
   +- *Project [i#40, j#41]
  +- *Filter (isnotnull(j#41) && isnotnull(i#40))
 +- *FileScan orc default.tableb[i#40,j#41] Batched: false, Format: 
ORC, Location: 
ListingFileCatalog[file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/tableb],
 PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(i)], ReadSchema: 
struct
```

## How was this patch tested?

WIP. I need to add tests for:
- [ ] Check if the planner is not introducing extra `Shuffle` for such query
- [ ]  Check if the compatibility among `PartitioningCollection` and 
`HashPartitioning` makes sense.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tejasapatil/spark 
SPARK-18067_smb_join_pred_avoid_shuffle

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15605.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15605


commit dac2f49b5dc66b67d632632907cc654f4d319434
Author: Tejas Patil 
Date:   2016-10-24T04:23:13Z

[SPARK-18067] [SQL] SortMergeJoin adds shuffle if join predicates have non 
partitioned columns




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the 

[GitHub] spark pull request #15605: [WIP] [SPARK-18067] [SQL] SortMergeJoin adds shuf...

2016-10-24 Thread tejasapatil
Github user tejasapatil closed the pull request at:

https://github.com/apache/spark/pull/15605


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15605: [WIP] [SPARK-18067] [SQL] SortMergeJoin adds shuf...

2016-10-23 Thread tejasapatil
GitHub user tejasapatil opened a pull request:

https://github.com/apache/spark/pull/15605

[WIP] [SPARK-18067] [SQL] SortMergeJoin adds shuffle if join predicates 
have non partitioned columns

## What changes were proposed in this pull request?

See https://issues.apache.org/jira/browse/SPARK-18067 for discussion. 
Putting out a PR to get some feedback about the approach.

Assume that there are two tables with columns `key` and `value` both hash 
partitioned over `key`. Assume these are the partitions for the children:

partitions |child 1 | child 2
- | -- | 
partition 0 | [0, 0, 0, 3] | [0, 0, 3, 3]
partition 1 | [1, 4, 4] | [4]
partition 2 | [2, 2] | [2, 5, 5, 5]

Since we have _all_ the same values of `key` in a given partition, we can 
evaluate other join predicates like (`tableA.value` = `tableB.value`) right 
there without needing any shuffle.

What is previously being done i.e. `HashPartitioning(key, value)` expects  
over rows with same value of `pmod( hash(key, value))` to be in the same 
partition and does not take advantage of the fact that we already have rows 
with same `key` packed together. 

This PR uses `PartitioningCollection` instead of `HashPartitioning` for 
expected partitioning.

Query:

```
val df = (0 until 16).map(i => (i, i * 2)).toDF("i", "j").coalesce(1)
df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, 
"i").sortBy("i").saveAsTable("tableA")
df.write.format("org.apache.spark.sql.hive.orc.OrcFileFormat").bucketBy(8, 
"i").sortBy("i").saveAsTable("tableB")

hc.sql("SELECT * FROM tableA a JOIN tableB b ON a.i=b.i AND 
a.j=b.j").explain(true)
```

Before:

```
*SortMergeJoin [i#38, j#39], [i#40, j#41], Inner
:- *Sort [i#38 ASC NULLS FIRST, j#39 ASC NULLS FIRST], false, 0
:  +- Exchange hashpartitioning(i#38, j#39, 200)
: +- *Project [i#38, j#39]
:+- *Filter (isnotnull(i#38) && isnotnull(j#39))
:   +- *FileScan orc default.tablea[i#38,j#39] Batched: false, 
Format: ORC, Location: 
ListingFileCatalog[file:/Users/tejasp/Desktop/dev/tp-spark-2/spark/spark-warehouse/tablea],
 PartitionFilters: [], PushedFilters: [IsNotNull(i), IsNotNull(j)], ReadSchema: 
struct
+- *Sort [i#40 ASC NULLS FIRST, j#41 ASC NULLS FIRST], false, 0
   +- Exchange hashpartitioning(i#40, j#41, 200)
  +- *Project [i#40, j#41]
 +- *Filter (isnotnull(i#40) && isnotnull(j#41))
+- *FileScan orc default.tableb[i#40,j#41] Batched: false, 
Format: ORC, Location: 
ListingFileCatalog[file:/Users/tejasp/Desktop/dev/tp-spark-2/spark/spark-warehouse/tableb],
 PartitionFilters: [], PushedFilters: [IsNotNull(i), IsNotNull(j)], ReadSchema: 
struct
```

After:

```
== Physical Plan ==
*SortMergeJoin [i#38, j#39], [i#40, j#41], Inner
:- *Sort [i#38 ASC NULLS FIRST, j#39 ASC NULLS FIRST], false, 0
:  +- *Project [i#38, j#39]
: +- *Filter (isnotnull(j#39) && isnotnull(i#38))
:+- *FileScan orc default.tablea[i#38,j#39] Batched: false, Format: 
ORC, Location: 
ListingFileCatalog[file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/tablea],
 PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(i)], ReadSchema: 
struct
+- *Sort [i#40 ASC NULLS FIRST, j#41 ASC NULLS FIRST], false, 0
   +- *Project [i#40, j#41]
  +- *Filter (isnotnull(j#41) && isnotnull(i#40))
 +- *FileScan orc default.tableb[i#40,j#41] Batched: false, Format: 
ORC, Location: 
ListingFileCatalog[file:/Users/tejasp/Desktop/dev/tp-spark/spark-warehouse/tableb],
 PartitionFilters: [], PushedFilters: [IsNotNull(j), IsNotNull(i)], ReadSchema: 
struct
```

## How was this patch tested?

WIP. I need to add tests for:
[] Check if the planner is not introducing extra `Shuffle` for such query
[] Check if the compatibility among `PartitioningCollection` and 
`HashPartitioning` makes sense.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tejasapatil/spark 
SPARK-18067_smb_join_pred_avoid_shuffle

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/15605.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #15605


commit dac2f49b5dc66b67d632632907cc654f4d319434
Author: Tejas Patil 
Date:   2016-10-24T04:23:13Z

[SPARK-18067] [SQL] SortMergeJoin adds shuffle if join predicates have non 
partitioned columns




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is