Re: How to do map join in Spark SQL
spark.sql.autoBroadcastJoinThreshold default value in 1.5.2 is 10MB According to the output in console Spark is doing broadcast, but query which looks like the following does not perform well select big_t.*, small_t.name range_name from big_t join small_t on (1=1) where small_t.min <= big_t.v and big_t.v < small_t.max instead of it I registered UDF which returns range_name val ranges = sqlContext.sql("select min_v, max_v, name from small_t"). collect().map(r => (r.getLong(0), r.getLong(1), r.getString(2))).sortBy(_._1) sqlContext.udf.register("findRangeName", (v: java.lang.Long) => RangeUDF.findName(v, ranges)) // RangeUDF.findName def findName(vObj: java.lang.Long, ranges: Array[(Long, Long, String)]): String = { val v = if (vObj == null) -1L else vObj.longValue() ranges.find(x => x._1 <= v && v < x._2).map(_._3).getOrElse("") } // Now I can use udf to get rangeName select big_t.*, findRangeName(v) range_name from big_t On Sun, Dec 20, 2015 at 9:16 AM, Chris Fregly <ch...@fregly.com> wrote: > this type of broadcast should be handled by Spark SQL/DataFrames > automatically. > > this is the primary cost-based, physical-plan query optimization that the > Spark SQL Catalyst optimizer supports. > > in Spark 1.5 and before, you can trigger this optimization by properly > setting the spark.sql.autobroadcastThreshold to a value that is *above* the > size of your smaller table when fully bloated in JVM memory (not the > serialized size of the data on disk - very common mistake). > > in Spark 1.6+, there are heuristics to make this decision dynamically - > and even allow hybrid execution where certain keys - within the same Spark > job - will be broadcast and others won't depending on their relative " > "hotness" for that particular job. > > common theme of Spark 1.6 and beyond will be adaptive physical plan > execution, adaptive memory allocation to RDD Cache vs Spark Execution > Engine, adaptive cluster resource allocation, etc. > > the goal being to minimize manual configuration and enable many diff types > of workloads to run efficiently on the same Spark cluster. > > On Dec 19, 2015, at 12:10 PM, Alexander Pivovarov <apivova...@gmail.com> > wrote: > > I collected small DF to array of tuple3 > Then I registered UDF with function which is doing lookup in the array > Then I just run select which uses the UDF. > On Dec 18, 2015 1:06 AM, "Akhil Das" <ak...@sigmoidanalytics.com> wrote: > >> You can broadcast your json data and then do a map side join. This >> article is a good start >> http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/ >> >> Thanks >> Best Regards >> >> On Wed, Dec 16, 2015 at 2:51 AM, Alexander Pivovarov < >> apivova...@gmail.com> wrote: >> >>> I have big folder having ORC files. Files have duration field (e.g. >>> 3,12,26, etc) >>> Also I have small json file (just 8 rows) with ranges definition (min, >>> max , name) >>> 0, 10, A >>> 10, 20, B >>> 20, 30, C >>> etc >>> >>> Because I can not do equi-join btw duration and range min/max I need to >>> do cross join and apply WHERE condition to take records which belong to the >>> range >>> Cross join is an expensive operation I think that it's better if this >>> particular join done using Map Join >>> >>> How to do Map join in Spark Sql? >>> >> >>
Re: How to do map join in Spark SQL
this type of broadcast should be handled by Spark SQL/DataFrames automatically. this is the primary cost-based, physical-plan query optimization that the Spark SQL Catalyst optimizer supports. in Spark 1.5 and before, you can trigger this optimization by properly setting the spark.sql.autobroadcastThreshold to a value that is *above* the size of your smaller table when fully bloated in JVM memory (not the serialized size of the data on disk - very common mistake). in Spark 1.6+, there are heuristics to make this decision dynamically - and even allow hybrid execution where certain keys - within the same Spark job - will be broadcast and others won't depending on their relative " "hotness" for that particular job. common theme of Spark 1.6 and beyond will be adaptive physical plan execution, adaptive memory allocation to RDD Cache vs Spark Execution Engine, adaptive cluster resource allocation, etc. the goal being to minimize manual configuration and enable many diff types of workloads to run efficiently on the same Spark cluster. > On Dec 19, 2015, at 12:10 PM, Alexander Pivovarov <apivova...@gmail.com> > wrote: > > I collected small DF to array of tuple3 > Then I registered UDF with function which is doing lookup in the array > Then I just run select which uses the UDF. > >> On Dec 18, 2015 1:06 AM, "Akhil Das" <ak...@sigmoidanalytics.com> wrote: >> You can broadcast your json data and then do a map side join. This article >> is a good start http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/ >> >> Thanks >> Best Regards >> >>> On Wed, Dec 16, 2015 at 2:51 AM, Alexander Pivovarov <apivova...@gmail.com> >>> wrote: >>> I have big folder having ORC files. Files have duration field (e.g. >>> 3,12,26, etc) >>> Also I have small json file (just 8 rows) with ranges definition (min, max >>> , name) >>> 0, 10, A >>> 10, 20, B >>> 20, 30, C >>> etc >>> >>> Because I can not do equi-join btw duration and range min/max I need to do >>> cross join and apply WHERE condition to take records which belong to the >>> range >>> Cross join is an expensive operation I think that it's better if this >>> particular join done using Map Join >>> >>> How to do Map join in Spark Sql?
Re: How to do map join in Spark SQL
I collected small DF to array of tuple3 Then I registered UDF with function which is doing lookup in the array Then I just run select which uses the UDF. On Dec 18, 2015 1:06 AM, "Akhil Das" <ak...@sigmoidanalytics.com> wrote: > You can broadcast your json data and then do a map side join. This article > is a good start http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/ > > Thanks > Best Regards > > On Wed, Dec 16, 2015 at 2:51 AM, Alexander Pivovarov <apivova...@gmail.com > > wrote: > >> I have big folder having ORC files. Files have duration field (e.g. >> 3,12,26, etc) >> Also I have small json file (just 8 rows) with ranges definition (min, >> max , name) >> 0, 10, A >> 10, 20, B >> 20, 30, C >> etc >> >> Because I can not do equi-join btw duration and range min/max I need to >> do cross join and apply WHERE condition to take records which belong to the >> range >> Cross join is an expensive operation I think that it's better if this >> particular join done using Map Join >> >> How to do Map join in Spark Sql? >> > >
Re: How to do map join in Spark SQL
You can broadcast your json data and then do a map side join. This article is a good start http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/ Thanks Best Regards On Wed, Dec 16, 2015 at 2:51 AM, Alexander Pivovarov <apivova...@gmail.com> wrote: > I have big folder having ORC files. Files have duration field (e.g. > 3,12,26, etc) > Also I have small json file (just 8 rows) with ranges definition (min, > max , name) > 0, 10, A > 10, 20, B > 20, 30, C > etc > > Because I can not do equi-join btw duration and range min/max I need to do > cross join and apply WHERE condition to take records which belong to the > range > Cross join is an expensive operation I think that it's better if this > particular join done using Map Join > > How to do Map join in Spark Sql? >
How to do map join in Spark SQL
I have big folder having ORC files. Files have duration field (e.g. 3,12,26, etc) Also I have small json file (just 8 rows) with ranges definition (min, max , name) 0, 10, A 10, 20, B 20, 30, C etc Because I can not do equi-join btw duration and range min/max I need to do cross join and apply WHERE condition to take records which belong to the range Cross join is an expensive operation I think that it's better if this particular join done using Map Join How to do Map join in Spark Sql?