Re: How to do map join in Spark SQL

2015-12-20 Thread Alexander Pivovarov
spark.sql.autoBroadcastJoinThreshold default value in 1.5.2 is 10MB
According to the output in console Spark is doing broadcast, but query
which looks like the following does not perform well

select
 big_t.*,
 small_t.name range_name
from big_t
join small_t on (1=1)
where small_t.min <= big_t.v and big_t.v < small_t.max

instead of it I registered UDF which returns range_name

val ranges = sqlContext.sql("select min_v, max_v, name from small_t").
  collect().map(r => (r.getLong(0), r.getLong(1), r.getString(2))).sortBy(_._1)

sqlContext.udf.register("findRangeName", (v: java.lang.Long) =>
RangeUDF.findName(v, ranges))

// RangeUDF.findName

def findName(vObj: java.lang.Long, ranges: Array[(Long, Long,
String)]): String = {
  val v = if (vObj == null) -1L else vObj.longValue()
  ranges.find(x => x._1 <= v && v < x._2).map(_._3).getOrElse("")
}

// Now I can use udf to get rangeName
select
 big_t.*,
 findRangeName(v) range_name
from big_t

On Sun, Dec 20, 2015 at 9:16 AM, Chris Fregly <ch...@fregly.com> wrote:

> this type of broadcast should be handled by Spark SQL/DataFrames
> automatically.
>
> this is the primary cost-based, physical-plan query optimization that the
> Spark SQL Catalyst optimizer supports.
>
> in Spark 1.5 and before, you can trigger this optimization by properly
> setting the spark.sql.autobroadcastThreshold to a value that is *above* the
> size of your smaller table when fully bloated in JVM memory (not the
> serialized size of the data on disk - very common mistake).
>
> in Spark 1.6+, there are heuristics to make this decision dynamically -
> and even allow hybrid execution where certain keys - within the same Spark
> job - will be broadcast and others won't depending on their relative "
> "hotness" for that particular job.
>
> common theme of Spark 1.6 and beyond will be adaptive physical plan
> execution, adaptive memory allocation to RDD Cache vs Spark Execution
> Engine, adaptive cluster resource allocation, etc.
>
> the goal being to minimize manual configuration and enable many diff types
> of workloads to run efficiently on the same Spark cluster.
>
> On Dec 19, 2015, at 12:10 PM, Alexander Pivovarov <apivova...@gmail.com>
> wrote:
>
> I collected small DF to array of tuple3
> Then I registered UDF with function which is doing lookup in the array
> Then I just run select which uses the UDF.
> On Dec 18, 2015 1:06 AM, "Akhil Das" <ak...@sigmoidanalytics.com> wrote:
>
>> You can broadcast your json data and then do a map side join. This
>> article is a good start
>> http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/
>>
>> Thanks
>> Best Regards
>>
>> On Wed, Dec 16, 2015 at 2:51 AM, Alexander Pivovarov <
>> apivova...@gmail.com> wrote:
>>
>>> I have big folder having ORC files. Files have duration field (e.g.
>>> 3,12,26, etc)
>>> Also I have small json file  (just 8 rows) with ranges definition (min,
>>> max , name)
>>> 0, 10, A
>>> 10, 20, B
>>> 20, 30, C
>>> etc
>>>
>>> Because I can not do equi-join btw duration and range min/max I need to
>>> do cross join and apply WHERE condition to take records which belong to the
>>> range
>>> Cross join is an expensive operation I think that it's better if this
>>> particular join done using Map Join
>>>
>>> How to do Map join in Spark Sql?
>>>
>>
>>


Re: How to do map join in Spark SQL

2015-12-20 Thread Chris Fregly
this type of broadcast should be handled by Spark SQL/DataFrames automatically.

this is the primary cost-based, physical-plan query optimization that the Spark 
SQL Catalyst optimizer supports.

in Spark 1.5 and before, you can trigger this optimization by properly setting 
the spark.sql.autobroadcastThreshold to a value that is *above* the size of 
your smaller table when fully bloated in JVM memory (not the serialized size of 
the data on disk - very common mistake).

in Spark 1.6+, there are heuristics to make this decision dynamically - and 
even allow hybrid execution where certain keys - within the same Spark job - 
will be broadcast and others won't depending on their relative " "hotness" for 
that particular job.

common theme of Spark 1.6 and beyond will be adaptive physical plan execution, 
adaptive memory allocation to RDD Cache vs Spark Execution Engine, adaptive 
cluster resource allocation, etc.

the goal being to minimize manual configuration and enable many diff types of 
workloads to run efficiently on the same Spark cluster.

> On Dec 19, 2015, at 12:10 PM, Alexander Pivovarov <apivova...@gmail.com> 
> wrote:
> 
> I collected small DF to array of tuple3
> Then I registered UDF with function which is doing lookup in the array
> Then I just run select which uses the UDF.
> 
>> On Dec 18, 2015 1:06 AM, "Akhil Das" <ak...@sigmoidanalytics.com> wrote:
>> You can broadcast your json data and then do a map side join. This article 
>> is a good start http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/
>> 
>> Thanks
>> Best Regards
>> 
>>> On Wed, Dec 16, 2015 at 2:51 AM, Alexander Pivovarov <apivova...@gmail.com> 
>>> wrote:
>>> I have big folder having ORC files. Files have duration field (e.g. 
>>> 3,12,26, etc)
>>> Also I have small json file  (just 8 rows) with ranges definition (min, max 
>>> , name)
>>> 0, 10, A
>>> 10, 20, B
>>> 20, 30, C 
>>> etc
>>> 
>>> Because I can not do equi-join btw duration and range min/max I need to do 
>>> cross join and apply WHERE condition to take records which belong to the 
>>> range
>>> Cross join is an expensive operation I think that it's better if this 
>>> particular join done using Map Join
>>> 
>>> How to do Map join in Spark Sql?


Re: How to do map join in Spark SQL

2015-12-19 Thread Alexander Pivovarov
I collected small DF to array of tuple3
Then I registered UDF with function which is doing lookup in the array
Then I just run select which uses the UDF.
On Dec 18, 2015 1:06 AM, "Akhil Das" <ak...@sigmoidanalytics.com> wrote:

> You can broadcast your json data and then do a map side join. This article
> is a good start http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/
>
> Thanks
> Best Regards
>
> On Wed, Dec 16, 2015 at 2:51 AM, Alexander Pivovarov <apivova...@gmail.com
> > wrote:
>
>> I have big folder having ORC files. Files have duration field (e.g.
>> 3,12,26, etc)
>> Also I have small json file  (just 8 rows) with ranges definition (min,
>> max , name)
>> 0, 10, A
>> 10, 20, B
>> 20, 30, C
>> etc
>>
>> Because I can not do equi-join btw duration and range min/max I need to
>> do cross join and apply WHERE condition to take records which belong to the
>> range
>> Cross join is an expensive operation I think that it's better if this
>> particular join done using Map Join
>>
>> How to do Map join in Spark Sql?
>>
>
>


Re: How to do map join in Spark SQL

2015-12-18 Thread Akhil Das
You can broadcast your json data and then do a map side join. This article
is a good start http://dmtolpeko.com/2015/02/20/map-side-join-in-spark/

Thanks
Best Regards

On Wed, Dec 16, 2015 at 2:51 AM, Alexander Pivovarov <apivova...@gmail.com>
wrote:

> I have big folder having ORC files. Files have duration field (e.g.
> 3,12,26, etc)
> Also I have small json file  (just 8 rows) with ranges definition (min,
> max , name)
> 0, 10, A
> 10, 20, B
> 20, 30, C
> etc
>
> Because I can not do equi-join btw duration and range min/max I need to do
> cross join and apply WHERE condition to take records which belong to the
> range
> Cross join is an expensive operation I think that it's better if this
> particular join done using Map Join
>
> How to do Map join in Spark Sql?
>


How to do map join in Spark SQL

2015-12-15 Thread Alexander Pivovarov
I have big folder having ORC files. Files have duration field (e.g.
3,12,26, etc)
Also I have small json file  (just 8 rows) with ranges definition (min, max
, name)
0, 10, A
10, 20, B
20, 30, C
etc

Because I can not do equi-join btw duration and range min/max I need to do
cross join and apply WHERE condition to take records which belong to the
range
Cross join is an expensive operation I think that it's better if this
particular join done using Map Join

How to do Map join in Spark Sql?