Hello,

I am trying to process a dataset that is approximately 2 tb using a cluster
with 4.5 tb of ram.  The data is in parquet format and is initially loaded
into a dataframe.  A subset of the data is then queried for and converted
to RDD for more complicated processing.  The first stage of that processing
is to mapToPair to use each rows id as the key in a tuple.  Then the data
goes through a combineByKey operation to group all values with the same
key.  This operation always exceeds the maximum cluster memory and the job
eventually fails.  While it is shuffling there is a lot of "spilling
in-memory map to disk" messages.  I am wondering if I were to have the data
initially partitioned such that all the rows with the same id resided
within the same partition if it would need to do left shuffling and perform
correctly.

To do the initial load I am using:

sqlContext.read().parquet(inputPathArray).repartition(10000, new
Column("id"));

I am not sure if this is the correct way to partition a dataframe so that
is my first question is the above correct.

My next question is that when I go from the dataframe to rdd using:

JavaRDD<LocationRecord> locationsForSpecificKey = sqlc.sql("SELECT * FROM
standardlocationrecords WHERE customerID = " + customerID + " AND
partnerAppID = " + partnerAppID)
                    .toJavaRDD().map(new LocationRecordFromRow()::apply);

is the partition scheme from the dataframe preserved or do I need to
repartition after doing a mapToPair using:

rdd.partitionBy and passing in a custom HashPartitioner that uses the hash
of the ID field.

My goal is to reduce the shuffling when doing the final combineByKey to
prevent the job from running out of memory and failing.  Any help would be
greatly appreciated.

Thanks,
Nathan

Reply via email to