Hi Jacek,
I haven't played with 2.1.0 yet, so not sure how much more optimized Window functions are compared to 1.6 and 2.0. However, one thing I do see in the self-join is a broadcast. So there's going to be a need broadcast the results of the groupBy out to the executors before it can do the join. In both cases it's shuffling the data (for the groupBy or the Window). Have you tried running both queries to see? Would be interesting to test it on varying data volumes as well (e.g. what if there's no broadcast). Thanks, Silvio ________________________________ From: Jacek Laskowski <ja...@japila.pl> Sent: Wednesday, November 9, 2016 7:36:47 AM To: user Subject: Physical plan for windows and joins - how to know which is faster? Hi, While playing around with Spark 2.1.0-SNAPSHOT (built today) and explain'ing two queries with WindowSpec and inner join I found the following plans and am wondering if you could help me to judge which query could be faster. What else would you ask for to be able to answer the question of one being more efficient than the other? Just by looking at the Spark's "stack traces" of the queries one could say that windowed variant (first one) is gonna be faster (as there are less physical operators) yet top-level Window operator is not codegened so it might be misleading. I'd appreciate your help to get me better at reading such trees. Thanks! scala> mydf.withColumn("sum(id)", sum('id) over byId3).explain == Physical Plan == Window [sum(cast(id#15 as bigint)) windowspecdefinition(ID % 3#60, ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING) AS sum(id)#665L], [ID % 3#60] +- *Sort [ID % 3#60 ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(ID % 3#60, 200) +- LocalTableScan [id#15, multiplied#16, ID % 3#60] scala> mydf.join(mydf.groupBy("ID % 3").sum("id"), "ID % 3").explain == Physical Plan == *Project [ID % 3#60, id#15, multiplied#16, sum(id)#677L] +- *BroadcastHashJoin [ID % 3#60], [ID % 3#681], Inner, BuildRight :- *Project [_1#12 AS id#15, _2#13 AS multiplied#16, (_1#12 % 3) AS ID % 3#60] : +- *Filter isnotnull((_1#12 % 3)) : +- LocalTableScan [_1#12, _2#13] +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))) +- *HashAggregate(keys=[ID % 3#681], functions=[sum(cast(id#15 as bigint))]) +- Exchange hashpartitioning(ID % 3#681, 200) +- *HashAggregate(keys=[ID % 3#681], functions=[partial_sum(cast(id#15 as bigint))]) +- *Project [_1#12 AS id#15, (_1#12 % 3) AS ID % 3#681] +- *Filter isnotnull((_1#12 % 3)) +- LocalTableScan [_1#12, _2#13] Pozdrawiam, Jacek Laskowski ---- https://medium.com/@jaceklaskowski/ Mastering Apache Spark 2.0 https://bit.ly/mastering-apache-spark Follow me at https://twitter.com/jaceklaskowski --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org