[jira] [Updated] (SPARK-28771) Join partitioned dataframes on superset of partitioning columns without shuffle
[ https://issues.apache.org/jira/browse/SPARK-28771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Artem Bergkamp updated SPARK-28771: --- Description: Hi Spark developers. Few months ago I asked this question at [stackoverflow|https://stackoverflow.com/questions/55229290/question-about-joining-dataframes-in-spark] but could not get usable solution. Only one valid suggestion was to implement it via catalyst optimizer extensions but this is not something that an ordinary user can do. I decided to raise improvement request since think such functionality should be available out of the box. Suppose I have two partitioned dataframes: {code:java} df1 = spark.createDataFrame( [(1,1,1), (2,2,2)], ['key1', 'key2', 'time'] ).repartition(3, 'key1', 'key2') df2 = spark.createDataFrame( [(1,1,1)], ['key1', 'key2', 'time'] ).repartition(3, 'key1', 'key2') {code} *(scenario 1)* If I join them by [key1, key2] join operation is performed within each partition without shuffle (number of partitions in result dataframe is the same): {code:java} x = df1.join(df2, on=['key1', 'key2'], how='left') assert x.rdd.getNumPartitions() == 3 {code} *(scenario 2)* But If I joint them by [key1, key2, time] shuffle operation takes place (number of partitions in result dataframe is 10 which is driven by spark.sql.shuffle.partitions option): {code:java} x = df1.join(df2, on=['key1', 'key2', 'time'], how='left') assert x.rdd.getNumPartitions() == 10 {code} *(scenario 3)* Join them by [key1, key2, time] via another version of join method: {code:java} x = df1.join(df2, [ df1['key1'] == df2['key1'], df1['key2'] == df2['key2'], df1['time'] == df2['time'] ], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time']) assert x.rdd.getNumPartitions() == 10 {code} *(scenario 4)* Join them by [key1, key2, time] via another version of join method with quality condition changed to equivalent. And surprisingly *it uses partitioning*: {code:java} x = df1.join(df2, [ df1['key1'] == df2['key1'], df1['key2'] == df2['key2'], (df1['time'] <= df2['time']) & (df1['time'] >= df2['time']) ], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time']) assert x.rdd.getNumPartitions() == 3 {code} *I expect all four described join scenarios to use partitioning and avoid shuffle.* At the same time groupby and window operations by [key1, key2, time] preserve number of partitions and done without shuffle. was: Hi Spark developers. Few months ago I asked this question at [stackoverflow|https://stackoverflow.com/questions/55229290/question-about-joining-dataframes-in-spark] but could not get usable solution. Only one valid suggestion was to implement it via catalyst optimizer extensions but this is not something that an ordinary user can do. I decided to raise improvement request since think such functionality should be available out of the box. Suppose I have two partitioned dataframes: {code:java} df1 = spark.createDataFrame( [(1,1,1), (2,2,2)], ['key1', 'key2', 'time'] ).repartition(3, 'key1', 'key2')df2 = spark.createDataFrame( [(1,1,1)], ['key1', 'key2', 'time'] ).repartition(3, 'key1', 'key2') {code} *(scenario 1)* If I join them by [key1, key2] join operation is performed within each partition without shuffle (number of partitions in result dataframe is the same): {code:java} x = df1.join(df2, on=['key1', 'key2'], how='left') assert x.rdd.getNumPartitions() == 3 {code} *(scenario 2)* But If I joint them by [key1, key2, time] shuffle operation takes place (number of partitions in result dataframe is 10 which is driven by spark.sql.shuffle.partitions option): {code:java} x = df1.join(df2, on=['key1', 'key2', 'time'], how='left') assert x.rdd.getNumPartitions() == 10 {code} *(scenario 3)* Join them by [key1, key2, time] via another version of join method: {code:java} x = df1.join(df2, [ df1['key1'] == df2['key1'], df1['key2'] == df2['key2'], df1['time'] == df2['time'] ], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time']) assert x.rdd.getNumPartitions() == 10 {code} *(scenario 4)* Join them by [key1, key2, time] via another version of join method with quality condition changed to equivalent. And surprisingly *it uses partitioning*: {code:java} x = df1.join(df2, [ df1['key1'] == df2['key1'], df1['key2'] == df2['key2'], (df1['time'] <= df2['time']) & (df1['time'] >= df2['time']) ], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time']) assert x.rdd.getNumPartitions() == 3 {code} *I expect all four described join scenarios to use partitioning and avoid shuffle.* At the same time groupby and window operations by [key1, key2, time] preserve number of partitions and done without shuffle. > Join partitioned dataframes on superset of partitioning columns without > shuffle > --- > >
[jira] [Updated] (SPARK-28771) Join partitioned dataframes on superset of partitioning columns without shuffle
[ https://issues.apache.org/jira/browse/SPARK-28771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-28771: -- Affects Version/s: (was: 3.0.0) 3.1.0 > Join partitioned dataframes on superset of partitioning columns without > shuffle > --- > > Key: SPARK-28771 > URL: https://issues.apache.org/jira/browse/SPARK-28771 > Project: Spark > Issue Type: Improvement > Components: Optimizer, SQL >Affects Versions: 3.1.0 > Environment: > {code:java} > from pyspark import SparkConf > from pyspark.sql import SparkSession > conf = SparkConf() > conf.setAll({ > 'spark.master': 'local[*]', > 'spark.sql.execution.arrow.enabled': 'true', > 'spark.sql.autoBroadcastJoinThreshold': '-1', > 'spark.sql.shuffle.partitions': '10' > }.items()); > spark = SparkSession.builder.config(conf=conf).getOrCreate() > {code} > > >Reporter: Artem Bergkamp >Priority: Minor > > Hi Spark developers. > Few months ago I asked this question at > [stackoverflow|https://stackoverflow.com/questions/55229290/question-about-joining-dataframes-in-spark] > but could not get usable solution. > Only one valid suggestion was to implement it via catalyst optimizer > extensions but this is not something that an ordinary user can do. > I decided to raise improvement request since think such functionality should > be available out of the box. > Suppose I have two partitioned dataframes: > {code:java} > df1 = spark.createDataFrame( > [(1,1,1), (2,2,2)], ['key1', 'key2', 'time'] > ).repartition(3, 'key1', 'key2')df2 = spark.createDataFrame( > [(1,1,1)], ['key1', 'key2', 'time'] > ).repartition(3, 'key1', 'key2') > {code} > *(scenario 1)* If I join them by [key1, key2] join operation is performed > within each partition without shuffle (number of partitions in result > dataframe is the same): > {code:java} > x = df1.join(df2, on=['key1', 'key2'], how='left') > assert x.rdd.getNumPartitions() == 3 > {code} > *(scenario 2)* But If I joint them by [key1, key2, time] shuffle operation > takes place (number of partitions in result dataframe is 10 which is driven > by spark.sql.shuffle.partitions option): > {code:java} > x = df1.join(df2, on=['key1', 'key2', 'time'], how='left') > assert x.rdd.getNumPartitions() == 10 > {code} > *(scenario 3)* Join them by [key1, key2, time] via another version of join > method: > {code:java} > x = df1.join(df2, [ > df1['key1'] == df2['key1'], > df1['key2'] == df2['key2'], > df1['time'] == df2['time'] > ], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time']) > assert x.rdd.getNumPartitions() == 10 > {code} > *(scenario 4)* Join them by [key1, key2, time] via another version of join > method with quality condition changed to equivalent. And surprisingly *it > uses partitioning*: > {code:java} > x = df1.join(df2, [ > df1['key1'] == df2['key1'], > df1['key2'] == df2['key2'], > (df1['time'] <= df2['time']) & (df1['time'] >= df2['time']) > ], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time']) > assert x.rdd.getNumPartitions() == 3 > {code} > *I expect all four described join scenarios to use partitioning and avoid > shuffle.* > At the same time groupby and window operations by [key1, key2, time] preserve > number of partitions and done without shuffle. -- This message was sent by Atlassian Jira (v8.3.4#803005) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-28771) Join partitioned dataframes on superset of partitioning columns without shuffle
[ https://issues.apache.org/jira/browse/SPARK-28771?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dongjoon Hyun updated SPARK-28771: -- Affects Version/s: (was: 2.4.3) 3.0.0 > Join partitioned dataframes on superset of partitioning columns without > shuffle > --- > > Key: SPARK-28771 > URL: https://issues.apache.org/jira/browse/SPARK-28771 > Project: Spark > Issue Type: Improvement > Components: Optimizer, SQL >Affects Versions: 3.0.0 > Environment: > {code:java} > from pyspark import SparkConf > from pyspark.sql import SparkSession > conf = SparkConf() > conf.setAll({ > 'spark.master': 'local[*]', > 'spark.sql.execution.arrow.enabled': 'true', > 'spark.sql.autoBroadcastJoinThreshold': '-1', > 'spark.sql.shuffle.partitions': '10' > }.items()); > spark = SparkSession.builder.config(conf=conf).getOrCreate() > {code} > > >Reporter: Artem Bergkamp >Priority: Minor > > Hi Spark developers. > Few months ago I asked this question at > [stackoverflow|https://stackoverflow.com/questions/55229290/question-about-joining-dataframes-in-spark] > but could not get usable solution. > Only one valid suggestion was to implement it via catalyst optimizer > extensions but this is not something that an ordinary user can do. > I decided to raise improvement request since think such functionality should > be available out of the box. > Suppose I have two partitioned dataframes: > {code:java} > df1 = spark.createDataFrame( > [(1,1,1), (2,2,2)], ['key1', 'key2', 'time'] > ).repartition(3, 'key1', 'key2')df2 = spark.createDataFrame( > [(1,1,1)], ['key1', 'key2', 'time'] > ).repartition(3, 'key1', 'key2') > {code} > *(scenario 1)* If I join them by [key1, key2] join operation is performed > within each partition without shuffle (number of partitions in result > dataframe is the same): > {code:java} > x = df1.join(df2, on=['key1', 'key2'], how='left') > assert x.rdd.getNumPartitions() == 3 > {code} > *(scenario 2)* But If I joint them by [key1, key2, time] shuffle operation > takes place (number of partitions in result dataframe is 10 which is driven > by spark.sql.shuffle.partitions option): > {code:java} > x = df1.join(df2, on=['key1', 'key2', 'time'], how='left') > assert x.rdd.getNumPartitions() == 10 > {code} > *(scenario 3)* Join them by [key1, key2, time] via another version of join > method: > {code:java} > x = df1.join(df2, [ > df1['key1'] == df2['key1'], > df1['key2'] == df2['key2'], > df1['time'] == df2['time'] > ], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time']) > assert x.rdd.getNumPartitions() == 10 > {code} > *(scenario 4)* Join them by [key1, key2, time] via another version of join > method with quality condition changed to equivalent. And surprisingly *it > uses partitioning*: > {code:java} > x = df1.join(df2, [ > df1['key1'] == df2['key1'], > df1['key2'] == df2['key2'], > (df1['time'] <= df2['time']) & (df1['time'] >= df2['time']) > ], how='left').drop(df2['key1']).drop(df2['key2']).drop(df2['time']) > assert x.rdd.getNumPartitions() == 3 > {code} > *I expect all four described join scenarios to use partitioning and avoid > shuffle.* > At the same time groupby and window operations by [key1, key2, time] preserve > number of partitions and done without shuffle. -- This message was sent by Atlassian Jira (v8.3.2#803003) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org