Re: Best practice for join

2014-11-04 Thread Akhil Das
How about Using SparkSQL https://spark.apache.org/sql/?

Thanks
Best Regards

On Wed, Nov 5, 2014 at 1:53 AM, Benyi Wang bewang.t...@gmail.com wrote:

 I need to join RDD[A], RDD[B], and RDD[C]. Here is what I did,

 # build (K,V) from A and B to prepare the join

 val ja = A.map( r = (K1, Va))
 val jb = B.map( r = (K1, Vb))

 # join A, B

 val jab = ja.join(jb)

 # build (K,V) from the joined result of A and B to prepare joining with C

 val jc = C.map(r = (K2, Vc))
 jab.join(jc).map( = (K,V) ).reduceByKey(_ + _)

 Because A may have multiple fields, so Va is a tuple with more than 2
 fields. It is said that scala Tuple may not be specialized, and there is
 boxing/unboxing issue, so I tried to use case class for Va, Vb, and Vc,
 K2 and K which are compound keys, and V is a pair of count and ratio, _+_
 will create a new ratio. I register those case classes in Kryo.

 The sizes of Shuffle read/write look smaller. But I found GC overhead is
 really high: GC Time is about 20~30% of duration for the reduceByKey task.
 I think a lot of new objects are created using case classes during
 map/reduce.

 How to make the thing better?



Re: Best practice for join

2014-11-04 Thread Benyi Wang
I'm using spark-1.0.0 in CDH 5.1.0. The big problem is SparkSQL doesn't
support Hash join in this version.

On Tue, Nov 4, 2014 at 10:54 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 How about Using SparkSQL https://spark.apache.org/sql/?

 Thanks
 Best Regards

 On Wed, Nov 5, 2014 at 1:53 AM, Benyi Wang bewang.t...@gmail.com wrote:

 I need to join RDD[A], RDD[B], and RDD[C]. Here is what I did,

 # build (K,V) from A and B to prepare the join

 val ja = A.map( r = (K1, Va))
 val jb = B.map( r = (K1, Vb))

 # join A, B

 val jab = ja.join(jb)

 # build (K,V) from the joined result of A and B to prepare joining with C

 val jc = C.map(r = (K2, Vc))
 jab.join(jc).map( = (K,V) ).reduceByKey(_ + _)

 Because A may have multiple fields, so Va is a tuple with more than 2
 fields. It is said that scala Tuple may not be specialized, and there is
 boxing/unboxing issue, so I tried to use case class for Va, Vb, and Vc,
 K2 and K which are compound keys, and V is a pair of count and ratio, _+_
 will create a new ratio. I register those case classes in Kryo.

 The sizes of Shuffle read/write look smaller. But I found GC overhead is
 really high: GC Time is about 20~30% of duration for the reduceByKey task.
 I think a lot of new objects are created using case classes during
 map/reduce.

 How to make the thing better?





Re: Best practice for join

2014-11-04 Thread Akhil Das
Oh, in that case, if you want to reduce the GC time, you can specify the
level of parallelism along with your join, reduceByKey operations.

Thanks
Best Regards

On Wed, Nov 5, 2014 at 1:11 PM, Benyi Wang bewang.t...@gmail.com wrote:

 I'm using spark-1.0.0 in CDH 5.1.0. The big problem is SparkSQL doesn't
 support Hash join in this version.

 On Tue, Nov 4, 2014 at 10:54 PM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 How about Using SparkSQL https://spark.apache.org/sql/?

 Thanks
 Best Regards

 On Wed, Nov 5, 2014 at 1:53 AM, Benyi Wang bewang.t...@gmail.com wrote:

 I need to join RDD[A], RDD[B], and RDD[C]. Here is what I did,

 # build (K,V) from A and B to prepare the join

 val ja = A.map( r = (K1, Va))
 val jb = B.map( r = (K1, Vb))

 # join A, B

 val jab = ja.join(jb)

 # build (K,V) from the joined result of A and B to prepare joining with C

 val jc = C.map(r = (K2, Vc))
 jab.join(jc).map( = (K,V) ).reduceByKey(_ + _)

 Because A may have multiple fields, so Va is a tuple with more than 2
 fields. It is said that scala Tuple may not be specialized, and there is
 boxing/unboxing issue, so I tried to use case class for Va, Vb, and Vc,
 K2 and K which are compound keys, and V is a pair of count and ratio, _+_
 will create a new ratio. I register those case classes in Kryo.

 The sizes of Shuffle read/write look smaller. But I found GC overhead is
 really high: GC Time is about 20~30% of duration for the reduceByKey task.
 I think a lot of new objects are created using case classes during
 map/reduce.

 How to make the thing better?