[ https://issues.apache.org/jira/browse/SPARK-15326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Herman van Hovell closed SPARK-15326. ------------------------------------- Resolution: Not A Problem > Doing multiple unions on a Dataframe will result in a very inefficient query > plan > --------------------------------------------------------------------------------- > > Key: SPARK-15326 > URL: https://issues.apache.org/jira/browse/SPARK-15326 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 1.6.1, 2.0.0 > Reporter: Jurriaan Pruis > Attachments: Query Plan.pdf, skewed_join.py, skewed_join_plan.txt > > > While working with a very skewed dataset I noticed that repeated unions on a > dataframe will result in a query plan with 2^(union) - 1 unions. With large > datasets this will be very inefficient. > I tried to replicate this behaviour using a PySpark example (I've attached > the output of the explain() to this JIRA): > {code} > df = sqlCtx.range(10000000) > def r(name, max_val=100): > return F.round(F.lit(max_val) * F.pow(F.rand(), > 4)).cast('integer').alias(name) > # Create a skewed dataset > skewed = df.select('id', r('a'), r('b'), r('c'), r('d'), r('e'), r('f')) > # Find the skewed values in the dataset > top_10_percent = skewed.freqItems(['a', 'b', 'c', 'd', 'e', 'f'], > 0.10).collect()[0] > def skewjoin(skewed, right, column, freqItems): > freqItems = freqItems[column + '_freqItems'] > skewed = skewed.alias('skewed') > cond = F.col(column).isin(freqItems) > # First broadcast join the frequent (skewed) values > filtered = skewed.filter(cond).join(F.broadcast(right.filter(cond)), > column, 'left_outer') > # Use a regular join for the non skewed values (with big tables this will > use a SortMergeJoin) > non_skewed = skewed.filter(cond == False).join(right.filter(cond == > False), column, 'left_outer') > # join them together and replace the column with the column found in the > right DataFrame > return filtered.unionAll(non_skewed).select('skewed.*', > right['id'].alias(column + '_key')).drop(column) > # Create the dataframes that will be joined to the skewed dataframe > right_size = 100 > df_a = sqlCtx.range(right_size).select('id', F.col('id').alias('a')) > df_b = sqlCtx.range(right_size).select('id', F.col('id').alias('b')) > df_c = sqlCtx.range(right_size).select('id', F.col('id').alias('c')) > df_d = sqlCtx.range(right_size).select('id', F.col('id').alias('d')) > df_e = sqlCtx.range(right_size).select('id', F.col('id').alias('e')) > df_f = sqlCtx.range(right_size).select('id', F.col('id').alias('f')) > # Join everything together > df = skewed > df = skewjoin(df, df_a, 'a', top_10_percent) > df = skewjoin(df, df_b, 'b', top_10_percent) > df = skewjoin(df, df_c, 'c', top_10_percent) > df = skewjoin(df, df_d, 'd', top_10_percent) > df = skewjoin(df, df_e, 'e', top_10_percent) > df = skewjoin(df, df_f, 'f', top_10_percent) > # df.explain() shows the plan where it does 63 unions > (2^(number_of_skewjoins) - 1) > # which will be very inefficient and slow > df.explain(True) > # Evaluate the plan > # You'd expect this to return 10000000, but it does not, it returned 10000140 > on my system > # (probably because it will recalculate the random columns? Not sure though) > print(df.count()) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org