[jira] [Updated] (SPARK-28771) Join partitioned dataframes on superset of partitioning columns without shuffle

2020-10-20 Thread Artem Bergkamp (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-28771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Artem Bergkamp updated SPARK-28771:
---
Description: 
Hi Spark developers.

Few months ago I asked this question at 
[stackoverflow|https://stackoverflow.com/questions/55229290/question-about-joining-dataframes-in-spark]
 but could not get usable solution.
 Only one valid suggestion was to implement it via catalyst optimizer 
extensions but this is not something that an ordinary user can do.
 I decided to raise improvement request since think such functionality should 
be available out of the box.

Suppose I have two partitioned dataframes:
{code:java}
df1 = spark.createDataFrame(
[(1,1,1), (2,2,2)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')
df2 = spark.createDataFrame(
[(1,1,1)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')
{code}
*(scenario 1)* If I join them by [key1, key2] join operation is performed 
within each partition without shuffle (number of partitions in result dataframe 
is the same):
{code:java}
x = df1.join(df2, on=['key1', 'key2'], how='left')
assert x.rdd.getNumPartitions() == 3
{code}
*(scenario 2)* But If I joint them by [key1, key2, time] shuffle operation 
takes place (number of partitions in result dataframe is 10 which is driven by 
spark.sql.shuffle.partitions option):
{code:java}
x = df1.join(df2, on=['key1', 'key2', 'time'], how='left')
assert x.rdd.getNumPartitions() == 10
{code}
*(scenario 3)* Join them by [key1, key2, time] via another version of join 
method:
{code:java}
x = df1.join(df2, [
df1['key1'] == df2['key1'],
df1['key2'] == df2['key2'],
df1['time'] == df2['time']
], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time'])
assert x.rdd.getNumPartitions() == 10
{code}
*(scenario 4)* Join them by [key1, key2, time] via another version of join 
method with quality condition changed to equivalent. And surprisingly *it uses 
partitioning*:
{code:java}
x = df1.join(df2, [
df1['key1'] == df2['key1'],
df1['key2'] == df2['key2'],
(df1['time'] <= df2['time']) & (df1['time'] >= df2['time'])
], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time'])
assert x.rdd.getNumPartitions() == 3
{code}
*I expect all four described join scenarios to use partitioning and avoid 
shuffle.*

At the same time groupby and window operations by [key1, key2, time] preserve 
number of partitions and done without shuffle.

  was:
Hi Spark developers.

Few months ago I asked this question at 
[stackoverflow|https://stackoverflow.com/questions/55229290/question-about-joining-dataframes-in-spark]
 but could not get usable solution.
 Only one valid suggestion was to implement it via catalyst optimizer 
extensions but this is not something that an ordinary user can do.
 I decided to raise improvement request since think such functionality should 
be available out of the box.

Suppose I have two partitioned dataframes:
{code:java}
df1 = spark.createDataFrame(
[(1,1,1), (2,2,2)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')df2 = spark.createDataFrame(
[(1,1,1)], ['key1', 'key2', 'time']
).repartition(3, 'key1', 'key2')
{code}
*(scenario 1)* If I join them by [key1, key2] join operation is performed 
within each partition without shuffle (number of partitions in result dataframe 
is the same):
{code:java}
x = df1.join(df2, on=['key1', 'key2'], how='left')
assert x.rdd.getNumPartitions() == 3
{code}
*(scenario 2)* But If I joint them by [key1, key2, time] shuffle operation 
takes place (number of partitions in result dataframe is 10 which is driven by 
spark.sql.shuffle.partitions option):
{code:java}
x = df1.join(df2, on=['key1', 'key2', 'time'], how='left')
assert x.rdd.getNumPartitions() == 10
{code}
*(scenario 3)* Join them by [key1, key2, time] via another version of join 
method:
{code:java}
x = df1.join(df2, [
df1['key1'] == df2['key1'],
df1['key2'] == df2['key2'],
df1['time'] == df2['time']
], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time'])
assert x.rdd.getNumPartitions() == 10
{code}
*(scenario 4)* Join them by [key1, key2, time] via another version of join 
method with quality condition changed to equivalent. And surprisingly *it uses 
partitioning*:
{code:java}
x = df1.join(df2, [
df1['key1'] == df2['key1'],
df1['key2'] == df2['key2'],
(df1['time'] <= df2['time']) & (df1['time'] >= df2['time'])
], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time'])
assert x.rdd.getNumPartitions() == 3
{code}
*I expect all four described join scenarios to use partitioning and avoid 
shuffle.*

At the same time groupby and window operations by [key1, key2, time] preserve 
number of partitions and done without shuffle.


> Join partitioned dataframes on superset of partitioning columns without 
> shuffle
> ---
>
>  

[jira] [Updated] (SPARK-28771) Join partitioned dataframes on superset of partitioning columns without shuffle

2020-03-16 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-28771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-28771:
--
Affects Version/s: (was: 3.0.0)
   3.1.0

> Join partitioned dataframes on superset of partitioning columns without 
> shuffle
> ---
>
> Key: SPARK-28771
> URL: https://issues.apache.org/jira/browse/SPARK-28771
> Project: Spark
>  Issue Type: Improvement
>  Components: Optimizer, SQL
>Affects Versions: 3.1.0
> Environment:  
> {code:java}
> from pyspark import SparkConf
> from pyspark.sql import SparkSession
> conf = SparkConf()
> conf.setAll({
>   'spark.master': 'local[*]',
>   'spark.sql.execution.arrow.enabled': 'true',
>   'spark.sql.autoBroadcastJoinThreshold': '-1',
>   'spark.sql.shuffle.partitions': '10'
> }.items());
> spark = SparkSession.builder.config(conf=conf).getOrCreate()
> {code}
>  
>  
>Reporter: Artem Bergkamp
>Priority: Minor
>
> Hi Spark developers.
> Few months ago I asked this question at 
> [stackoverflow|https://stackoverflow.com/questions/55229290/question-about-joining-dataframes-in-spark]
>  but could not get usable solution.
>  Only one valid suggestion was to implement it via catalyst optimizer 
> extensions but this is not something that an ordinary user can do.
>  I decided to raise improvement request since think such functionality should 
> be available out of the box.
> Suppose I have two partitioned dataframes:
> {code:java}
> df1 = spark.createDataFrame(
> [(1,1,1), (2,2,2)], ['key1', 'key2', 'time']
> ).repartition(3, 'key1', 'key2')df2 = spark.createDataFrame(
> [(1,1,1)], ['key1', 'key2', 'time']
> ).repartition(3, 'key1', 'key2')
> {code}
> *(scenario 1)* If I join them by [key1, key2] join operation is performed 
> within each partition without shuffle (number of partitions in result 
> dataframe is the same):
> {code:java}
> x = df1.join(df2, on=['key1', 'key2'], how='left')
> assert x.rdd.getNumPartitions() == 3
> {code}
> *(scenario 2)* But If I joint them by [key1, key2, time] shuffle operation 
> takes place (number of partitions in result dataframe is 10 which is driven 
> by spark.sql.shuffle.partitions option):
> {code:java}
> x = df1.join(df2, on=['key1', 'key2', 'time'], how='left')
> assert x.rdd.getNumPartitions() == 10
> {code}
> *(scenario 3)* Join them by [key1, key2, time] via another version of join 
> method:
> {code:java}
> x = df1.join(df2, [
> df1['key1'] == df2['key1'],
> df1['key2'] == df2['key2'],
> df1['time'] == df2['time']
> ], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time'])
> assert x.rdd.getNumPartitions() == 10
> {code}
> *(scenario 4)* Join them by [key1, key2, time] via another version of join 
> method with quality condition changed to equivalent. And surprisingly *it 
> uses partitioning*:
> {code:java}
> x = df1.join(df2, [
> df1['key1'] == df2['key1'],
> df1['key2'] == df2['key2'],
> (df1['time'] <= df2['time']) & (df1['time'] >= df2['time'])
> ], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time'])
> assert x.rdd.getNumPartitions() == 3
> {code}
> *I expect all four described join scenarios to use partitioning and avoid 
> shuffle.*
> At the same time groupby and window operations by [key1, key2, time] preserve 
> number of partitions and done without shuffle.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-28771) Join partitioned dataframes on superset of partitioning columns without shuffle

2019-08-19 Thread Dongjoon Hyun (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-28771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun updated SPARK-28771:
--
Affects Version/s: (was: 2.4.3)
   3.0.0

> Join partitioned dataframes on superset of partitioning columns without 
> shuffle
> ---
>
> Key: SPARK-28771
> URL: https://issues.apache.org/jira/browse/SPARK-28771
> Project: Spark
>  Issue Type: Improvement
>  Components: Optimizer, SQL
>Affects Versions: 3.0.0
> Environment:  
> {code:java}
> from pyspark import SparkConf
> from pyspark.sql import SparkSession
> conf = SparkConf()
> conf.setAll({
>   'spark.master': 'local[*]',
>   'spark.sql.execution.arrow.enabled': 'true',
>   'spark.sql.autoBroadcastJoinThreshold': '-1',
>   'spark.sql.shuffle.partitions': '10'
> }.items());
> spark = SparkSession.builder.config(conf=conf).getOrCreate()
> {code}
>  
>  
>Reporter: Artem Bergkamp
>Priority: Minor
>
> Hi Spark developers.
> Few months ago I asked this question at 
> [stackoverflow|https://stackoverflow.com/questions/55229290/question-about-joining-dataframes-in-spark]
>  but could not get usable solution.
>  Only one valid suggestion was to implement it via catalyst optimizer 
> extensions but this is not something that an ordinary user can do.
>  I decided to raise improvement request since think such functionality should 
> be available out of the box.
> Suppose I have two partitioned dataframes:
> {code:java}
> df1 = spark.createDataFrame(
> [(1,1,1), (2,2,2)], ['key1', 'key2', 'time']
> ).repartition(3, 'key1', 'key2')df2 = spark.createDataFrame(
> [(1,1,1)], ['key1', 'key2', 'time']
> ).repartition(3, 'key1', 'key2')
> {code}
> *(scenario 1)* If I join them by [key1, key2] join operation is performed 
> within each partition without shuffle (number of partitions in result 
> dataframe is the same):
> {code:java}
> x = df1.join(df2, on=['key1', 'key2'], how='left')
> assert x.rdd.getNumPartitions() == 3
> {code}
> *(scenario 2)* But If I joint them by [key1, key2, time] shuffle operation 
> takes place (number of partitions in result dataframe is 10 which is driven 
> by spark.sql.shuffle.partitions option):
> {code:java}
> x = df1.join(df2, on=['key1', 'key2', 'time'], how='left')
> assert x.rdd.getNumPartitions() == 10
> {code}
> *(scenario 3)* Join them by [key1, key2, time] via another version of join 
> method:
> {code:java}
> x = df1.join(df2, [
> df1['key1'] == df2['key1'],
> df1['key2'] == df2['key2'],
> df1['time'] == df2['time']
> ], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time'])
> assert x.rdd.getNumPartitions() == 10
> {code}
> *(scenario 4)* Join them by [key1, key2, time] via another version of join 
> method with quality condition changed to equivalent. And surprisingly *it 
> uses partitioning*:
> {code:java}
> x = df1.join(df2, [
> df1['key1'] == df2['key1'],
> df1['key2'] == df2['key2'],
> (df1['time'] <= df2['time']) & (df1['time'] >= df2['time'])
> ], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time'])
> assert x.rdd.getNumPartitions() == 3
> {code}
> *I expect all four described join scenarios to use partitioning and avoid 
> shuffle.*
> At the same time groupby and window operations by [key1, key2, time] preserve 
> number of partitions and done without shuffle.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org