Re: Best practice for join
How about Using SparkSQL https://spark.apache.org/sql/? Thanks Best Regards On Wed, Nov 5, 2014 at 1:53 AM, Benyi Wang bewang.t...@gmail.com wrote: I need to join RDD[A], RDD[B], and RDD[C]. Here is what I did, # build (K,V) from A and B to prepare the join val ja = A.map( r = (K1, Va)) val jb = B.map( r = (K1, Vb)) # join A, B val jab = ja.join(jb) # build (K,V) from the joined result of A and B to prepare joining with C val jc = C.map(r = (K2, Vc)) jab.join(jc).map( = (K,V) ).reduceByKey(_ + _) Because A may have multiple fields, so Va is a tuple with more than 2 fields. It is said that scala Tuple may not be specialized, and there is boxing/unboxing issue, so I tried to use case class for Va, Vb, and Vc, K2 and K which are compound keys, and V is a pair of count and ratio, _+_ will create a new ratio. I register those case classes in Kryo. The sizes of Shuffle read/write look smaller. But I found GC overhead is really high: GC Time is about 20~30% of duration for the reduceByKey task. I think a lot of new objects are created using case classes during map/reduce. How to make the thing better?
Re: Best practice for join
I'm using spark-1.0.0 in CDH 5.1.0. The big problem is SparkSQL doesn't support Hash join in this version. On Tue, Nov 4, 2014 at 10:54 PM, Akhil Das ak...@sigmoidanalytics.com wrote: How about Using SparkSQL https://spark.apache.org/sql/? Thanks Best Regards On Wed, Nov 5, 2014 at 1:53 AM, Benyi Wang bewang.t...@gmail.com wrote: I need to join RDD[A], RDD[B], and RDD[C]. Here is what I did, # build (K,V) from A and B to prepare the join val ja = A.map( r = (K1, Va)) val jb = B.map( r = (K1, Vb)) # join A, B val jab = ja.join(jb) # build (K,V) from the joined result of A and B to prepare joining with C val jc = C.map(r = (K2, Vc)) jab.join(jc).map( = (K,V) ).reduceByKey(_ + _) Because A may have multiple fields, so Va is a tuple with more than 2 fields. It is said that scala Tuple may not be specialized, and there is boxing/unboxing issue, so I tried to use case class for Va, Vb, and Vc, K2 and K which are compound keys, and V is a pair of count and ratio, _+_ will create a new ratio. I register those case classes in Kryo. The sizes of Shuffle read/write look smaller. But I found GC overhead is really high: GC Time is about 20~30% of duration for the reduceByKey task. I think a lot of new objects are created using case classes during map/reduce. How to make the thing better?
Re: Best practice for join
Oh, in that case, if you want to reduce the GC time, you can specify the level of parallelism along with your join, reduceByKey operations. Thanks Best Regards On Wed, Nov 5, 2014 at 1:11 PM, Benyi Wang bewang.t...@gmail.com wrote: I'm using spark-1.0.0 in CDH 5.1.0. The big problem is SparkSQL doesn't support Hash join in this version. On Tue, Nov 4, 2014 at 10:54 PM, Akhil Das ak...@sigmoidanalytics.com wrote: How about Using SparkSQL https://spark.apache.org/sql/? Thanks Best Regards On Wed, Nov 5, 2014 at 1:53 AM, Benyi Wang bewang.t...@gmail.com wrote: I need to join RDD[A], RDD[B], and RDD[C]. Here is what I did, # build (K,V) from A and B to prepare the join val ja = A.map( r = (K1, Va)) val jb = B.map( r = (K1, Vb)) # join A, B val jab = ja.join(jb) # build (K,V) from the joined result of A and B to prepare joining with C val jc = C.map(r = (K2, Vc)) jab.join(jc).map( = (K,V) ).reduceByKey(_ + _) Because A may have multiple fields, so Va is a tuple with more than 2 fields. It is said that scala Tuple may not be specialized, and there is boxing/unboxing issue, so I tried to use case class for Va, Vb, and Vc, K2 and K which are compound keys, and V is a pair of count and ratio, _+_ will create a new ratio. I register those case classes in Kryo. The sizes of Shuffle read/write look smaller. But I found GC overhead is really high: GC Time is about 20~30% of duration for the reduceByKey task. I think a lot of new objects are created using case classes during map/reduce. How to make the thing better?