Thank you for your detailed reply. First, the purpose of MyKey class is a wrapper to provide equals() and Comparable interface to byte[].
groupByKey() is for performance. I have to merge the byte[]s that have the same key. If merging is done with reduceByKey(), a lot of intermediate byte[] allocation and System.arraycopy() is executed, and it is too slow. So I had to resort to groupByKey(), and in the callback allocate the byte[] that has the total size of the byte[]s, and arraycopy() into it. groupByKey() works for this, since the size of the group is manageable in my application. And in fact it actually worked well when I implemented the same process with HBase Put class. So, I assume that it is not the problem. WithIndex is for excluding the record for the first partition. I could remove the record after collect()and sort(), but it was easier. I think that the problem is that when mapPartitionsWithIndex() executes, the size of the partition is too big. (several GB - it's the size of the HBase regions, so it has to be several GB.) I could allocate more memory to the executor, but then I cannot spawn enough number of executors for the previous RDD operations. It would be nice if: - mapPartitionsWithIndex() had loaded the partition by small chunks as the iterator sweeps through it, or - there were a function named firstRecordsOfPartitions(). About Parallelism lost: I thought that it is the possible alternative to mapPartitionsWithIndex() which can be run with smaller memory footprint. About strange Spark behavior: I don't think that Spark is malfunctioning. I just want to know the more detailed flow information - how can I check? Thanks. -----Original Message----- From: Sean Owen [mailto:so...@cloudera.com] Sent: Monday, September 22, 2014 5:46 PM To: innowireless TaeYun Kim Cc: user Subject: Re: Bulk-load to HBase I see a number of potential issues: On Mon, Sep 22, 2014 at 8:42 AM, innowireless TaeYun Kim <taeyun....@innowireless.co.kr> wrote: > JavaPairRDD<MyKey, byte[]> rdd = > // MyKey has a byte[] member for rowkey Two byte[] with the same contents are not equals(), so won't work as you intend as a key. Is there more to it? I assume so given your comment later. > .groupByKey() This forces all values for each key to be held in memory when the key is processed. Are you sure you can't do something comparable with reduceByKey? > rdd.mapPartitionsWithIndex(...) > // Gets the first record of each partitions. > // First partition's first record is excluded, since it's not > needed. You won't need "WithIndex" for that, though I doubt it matters. > 1. OutOfMemory exception on mapPartitionsWithIndex() for splitKeys. > > In my case, the number of regions is fairly small for the RDD, and the size > of a region is big. > This is intentional since the reasonable size of a HBase region is several GB. > But, for Spark, it is too big for a partition that can be handled for an > executor. > I thought mapPartitionsWithIndex would not load the entire partition, but I > was wrong. > Maybe it loaded the whole partition while I only wanted to fetch the first > record of the iterator. You can give executors more memory but I think groupByKey is your problem. > I could save all the partitions with save...() API and then load each > partition separately and call first(). > But I does not feel right. Parallelism is lost. Why is that necessary? > 2. Strange Spark behavior > > It is not fatal as 1, but it's strange. > In my code, the flow is as follows: flatMapToPair -> groupByKey -> > mapValues -> sortByKey But when I watch the Spark UI, it is executed > as follows: flatMapToPair -> sortByKey -> sortByKey(again!) -> mapValues > Since in my case the number of records are very large between flatMapToPair > and mapValues, it seems that Spark executes sortByKey on the worst timing. > I tried to trick the Spark with replacing mapValues with mapToPair, but the > execution order did not change. > Why? The final operation called by a top-level method X may not be X. Double-check these operation are from your current run and not an earlier one, and that the code you're executing is what you think it is. It is not going to somehow execute things in an order that is semantically different, no. --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org