Re: How to minimize shuffling on Spark dataframe Join?

2015-08-19 Thread Romi Kuntsman
If you create a PairRDD from the DataFrame, using
dataFrame.toRDD().mapToPair(), then you can call
partitionBy(someCustomPartitioner) which will partition the RDD by the key
(of the pair).
Then the operations on it (like joining with another RDD) will consider
this partitioning.
I'm not sure that DataFrames already support this.

On Wed, Aug 12, 2015 at 11:16 AM Abdullah Anwar 
abdullah.ibn.an...@gmail.com wrote:

 Hi Hemant,

 Thank you for your replay.

 I think source of my dataframe is not partitioned on key, its an avro
 file where 'id' is a field .. but I don't know how to read a file and at
 the same time configure partition key. I couldn't find  anything on
 SQLContext.read.load where you can set partition key. or in dataframe where
 you can set partition key. If it could partition the on the specified key
 .. will spark put the same partition range on same machine for two
 different dataframe??

What are the overall tips to join faster?

 Best Regards,
 Abdullah




 On Wed, Aug 12, 2015 at 11:02 AM, Hemant Bhanawat hemant9...@gmail.com
 wrote:

 Is the source of your dataframe partitioned on key? As per your mail, it
 looks like it is not. If that is the case,  for partitioning the data, you
 will have to shuffle the data anyway.

 Another part of your question is - how to co-group data from two
 dataframes based on a key? I think for RDD's cogroup in PairRDDFunctions is
 a way. I am not sure if something similar is available for DataFrames.

 Hemant





 On Tue, Aug 11, 2015 at 2:14 PM, Abdullah Anwar 
 abdullah.ibn.an...@gmail.com wrote:



 I have two dataframes like this

   student_rdf = (studentid, name, ...)
   student_result_rdf = (studentid, gpa, ...)

 we need to join this two dataframes. we are now doing like this,

 student_rdf.join(student_result_rdf, student_result_rdf[studentid] == 
 student_rdf[studentid])

 So it is simple. But it creates lots of data shuffling across worker
 nodes, but as joining key is similar and if the dataframe could (understand
 the partitionkey) be partitioned using that key (studentid) then there
 suppose not to be any shuffling at all. As similar data (based on partition
 key) would reside in similar node. is it possible, to hint spark to do this?

 So, I am finding the way to partition data based on a column while I
 read a dataframe from input. And If it is possible that Spark would
 understand that two partitionkey of two dataframes are similar, then how?




 --
 Abdullah





 --
 Abdullah



Re: How to minimize shuffling on Spark dataframe Join?

2015-08-12 Thread Abdullah Anwar
Hi Hemant,

Thank you for your replay.

I think source of my dataframe is not partitioned on key, its an avro
file where 'id' is a field .. but I don't know how to read a file and at
the same time configure partition key. I couldn't find  anything on
SQLContext.read.load where you can set partition key. or in dataframe where
you can set partition key. If it could partition the on the specified key
.. will spark put the same partition range on same machine for two
different dataframe??

   What are the overall tips to join faster?

Best Regards,
Abdullah




On Wed, Aug 12, 2015 at 11:02 AM, Hemant Bhanawat hemant9...@gmail.com
wrote:

 Is the source of your dataframe partitioned on key? As per your mail, it
 looks like it is not. If that is the case,  for partitioning the data, you
 will have to shuffle the data anyway.

 Another part of your question is - how to co-group data from two
 dataframes based on a key? I think for RDD's cogroup in PairRDDFunctions is
 a way. I am not sure if something similar is available for DataFrames.

 Hemant





 On Tue, Aug 11, 2015 at 2:14 PM, Abdullah Anwar 
 abdullah.ibn.an...@gmail.com wrote:



 I have two dataframes like this

   student_rdf = (studentid, name, ...)
   student_result_rdf = (studentid, gpa, ...)

 we need to join this two dataframes. we are now doing like this,

 student_rdf.join(student_result_rdf, student_result_rdf[studentid] == 
 student_rdf[studentid])

 So it is simple. But it creates lots of data shuffling across worker
 nodes, but as joining key is similar and if the dataframe could (understand
 the partitionkey) be partitioned using that key (studentid) then there
 suppose not to be any shuffling at all. As similar data (based on partition
 key) would reside in similar node. is it possible, to hint spark to do this?

 So, I am finding the way to partition data based on a column while I read
 a dataframe from input. And If it is possible that Spark would understand
 that two partitionkey of two dataframes are similar, then how?




 --
 Abdullah





-- 
Abdullah


Fwd: How to minimize shuffling on Spark dataframe Join?

2015-08-11 Thread Abdullah Anwar
I have two dataframes like this

  student_rdf = (studentid, name, ...)
  student_result_rdf = (studentid, gpa, ...)

we need to join this two dataframes. we are now doing like this,

student_rdf.join(student_result_rdf, student_result_rdf[studentid]
== student_rdf[studentid])

So it is simple. But it creates lots of data shuffling across worker nodes,
but as joining key is similar and if the dataframe could (understand the
partitionkey) be partitioned using that key (studentid) then there suppose
not to be any shuffling at all. As similar data (based on partition key)
would reside in similar node. is it possible, to hint spark to do this?

So, I am finding the way to partition data based on a column while I read a
dataframe from input. And If it is possible that Spark would understand
that two partitionkey of two dataframes are similar, then how?




-- 
Abdullah


Re: How to minimize shuffling on Spark dataframe Join?

2015-08-11 Thread Hemant Bhanawat
Is the source of your dataframe partitioned on key? As per your mail, it
looks like it is not. If that is the case,  for partitioning the data, you
will have to shuffle the data anyway.

Another part of your question is - how to co-group data from two dataframes
based on a key? I think for RDD's cogroup in PairRDDFunctions is a way. I
am not sure if something similar is available for DataFrames.

Hemant





On Tue, Aug 11, 2015 at 2:14 PM, Abdullah Anwar 
abdullah.ibn.an...@gmail.com wrote:



 I have two dataframes like this

   student_rdf = (studentid, name, ...)
   student_result_rdf = (studentid, gpa, ...)

 we need to join this two dataframes. we are now doing like this,

 student_rdf.join(student_result_rdf, student_result_rdf[studentid] == 
 student_rdf[studentid])

 So it is simple. But it creates lots of data shuffling across worker
 nodes, but as joining key is similar and if the dataframe could (understand
 the partitionkey) be partitioned using that key (studentid) then there
 suppose not to be any shuffling at all. As similar data (based on partition
 key) would reside in similar node. is it possible, to hint spark to do this?

 So, I am finding the way to partition data based on a column while I read
 a dataframe from input. And If it is possible that Spark would understand
 that two partitionkey of two dataframes are similar, then how?




 --
 Abdullah