Hi ,I think it is related to this issue [Adaptive execution in Spark] https://issues.apache.org/jira/browse/SPARK-9850 I will learn more about it.
------------------------------------------------------------------发件人:梅西0247 <zhen...@dtdream.com>发送时间:2016年6月21日(星期二) 10:31收件人:Mich Talebzadeh <mich.talebza...@gmail.com>; Takeshi Yamamuro <linguin....@gmail.com>; Yong Zhang <java8...@hotmail.com>抄 送:user@spark.apache.org <user@spark.apache.org>主 题:回复:Is it possible to turn a SortMergeJoin into BroadcastHashJoin? To Yong Zhang:Yes, a broadcast join hint works. But it is not what I want.Sometimes the result is really too big to cast a broadcast on it. What I want is a more adaptive implementation. ------------------------------------------------------------------发件人:Yong Zhang <java8...@hotmail.com>发送时间:2016年6月20日(星期一) 22:42收件人:Mich Talebzadeh <mich.talebza...@gmail.com>; Takeshi Yamamuro <linguin....@gmail.com>抄 送:梅西0247 <zhen...@dtdream.com>; user@spark.apache.org <user@spark.apache.org>主 题:RE: Is it possible to turn a SortMergeJoin into BroadcastHashJoin? If you are using Spark > 1.5, the best way is to use DataFrame API directly, instead of SQL. In dataframe, you can specify the boardcast join hint in the dataframe API, which will force the boardcast join. Yong From: mich.talebza...@gmail.com Date: Mon, 20 Jun 2016 13:09:17 +0100 Subject: Re: Is it possible to turn a SortMergeJoin into BroadcastHashJoin? To: linguin....@gmail.com CC: zhen...@dtdream.com; user@spark.apache.org what sort of the tables are these? Can you register the result set as temp table and do a join on that assuming the RS is going to be small s.filter(($"c2" < 1000)).registerTempTable("tmp") and then do a join between tmp and Table2 HTH Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw http://talebzadehmich.wordpress.com On 20 June 2016 at 12:38, Takeshi Yamamuro <linguin....@gmail.com> wrote: Seems it is hard to predict the output size of filters because the current spark has limited statistics of input data. A few hours ago, Reynold created a ticket for cost-based optimizer framework in https://issues.apache.org/jira/browse/SPARK-16026.If you have ideas, questions, and suggestions, feel free to join the discussion. // maropu On Mon, Jun 20, 2016 at 8:21 PM, 梅西0247 <zhen...@dtdream.com> wrote: Thanks for your reply, In fact, that is what i just did.... But my question is: Can we change the spark join behavior more clever, to turn a sortmergejoin into broadcasthashjoin automatically when if "found" that a output RDD is small enough? ------------------------------------------------------------------发件人:Takeshi Yamamuro <linguin....@gmail.com>发送时间:2016年6月20日(星期一) 19:16收件人:梅西0247 <zhen...@dtdream.com>抄 送:user <user@spark.apache.org>主 题:Re: Is it possible to turn a SortMergeJoin into BroadcastHashJoin? Hi, How about caching the result of `select * from a where a.c2 < 1000`, then joining them?You probably need to tune `spark.sql.autoBroadcastJoinThreshold` to enable broadcast joins for the result table. // maropu On Mon, Jun 20, 2016 at 8:06 PM, 梅西0247 <zhen...@dtdream.com> wrote: Hi everyone, I ran a SQL join statement on Spark 1.6.1 like this: select * from table1 a join table2 b on a.c1 = b.c1 where a.c2 < 1000;and it took quite a long time because It is a SortMergeJoin and the two tables are big. In fact, the size of filter result(select * from a where a.c2 < 1000) is very small, and I think a better solution is to use a BroadcastJoin with the filter result, but I know the physical plan is static and it won't be changed. So, can we make the physical plan more adaptive? (In this example, I mean using a BroadcastHashJoin instead of SortMergeJoin automatically. ) -- --- Takeshi Yamamuro -- --- Takeshi Yamamuro