It's tricky really since you may not know upfront how much data is in
there. You could possibly take a look at how much data is in the HBase
tables to get an idea.

It may take a bit of trial and error, like running out of memory trying to
cache the dataset, and checking the Spark UI on port 4040 to see how much
is cached and how much memory still remains available, etc etc. You should
also take a look at http://spark.apache.org/docs/latest/tuning.html for
ideas around memory and serialization tuning.

Broadly speaking, what you want to try to do is filter as much data as
possible first and cache the subset of data on which you'll be performing
multiple passes or computations. For example, based on your code above, you
may in fact only wish to cache the data that is the "interesting" fields
RDD. It all depends on what you're trying to achieve.

If you will only be doing one pass through the data anyway (like running a
count every time on the full dataset) then caching is not going to help you.


On Tue, Feb 25, 2014 at 4:59 PM, Soumitra Kumar <kumar.soumi...@gmail.com>wrote:

> Thanks Nick.
>
> How do I figure out if the RDD fits in memory?
>
>
> On Tue, Feb 25, 2014 at 1:04 AM, Nick Pentreath 
> <nick.pentre...@gmail.com>wrote:
>
>> cache only caches the data on the first action (count) - the first time
>> it still needs to read the data from the source. So the first time you call
>> count it will take the same amount of time whether cache is enabled or not.
>> The second time you call count on a cached RDD, you should see that it
>> takes a lot less time (assuming that the data fit in memory).
>>
>>
>> On Tue, Feb 25, 2014 at 9:38 AM, Soumitra Kumar <kumar.soumi...@gmail.com
>> > wrote:
>>
>>> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>>>
>>> My expectation is that with cache enabled, there should not be any
>>> penalty of 'hBaseRDD.count' call.
>>>
>>>
>>>
>>> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <
>>> nick.pentre...@gmail.com> wrote:
>>>
>>>> Yes, you''re initiating a scan for each count call. The normal way to
>>>> improve this would be to use cache(), which is what you have in your
>>>> commented out line:
>>>> // hBaseRDD.cache()
>>>>
>>>> If you uncomment that line, you should see an improvement overall.
>>>>
>>>> If caching is not an option for some reason (maybe data is too large),
>>>> then you can implement an overall count in your readFields method using
>>>> accumulators:
>>>>
>>>> val count = sc.accumulator(0L)
>>>> ...
>>>> In your flatMap function do count += 1 for each row (regardless of
>>>> whether "interesting" or not).
>>>>
>>>> In your main method after doing an action (e.g. count in your case),
>>>> call val totalCount = count.value.
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <
>>>> kumar.soumi...@gmail.com> wrote:
>>>>
>>>>> I have a code which reads an HBase table, and counts number of rows
>>>>> containing a field.
>>>>>
>>>>>     def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>>>>> RDD[List[Array[Byte]]] = {
>>>>>         return rdd.flatMap(kv => {
>>>>>             // Set of interesting keys for this use case
>>>>>             val keys = List ("src")
>>>>>             var data = List[Array[Byte]]()
>>>>>             var usefulRow = false
>>>>>
>>>>>             val cf = Bytes.toBytes ("cf")
>>>>>             keys.foreach {key =>
>>>>>                 val col = kv._2.getValue(cf, Bytes.toBytes(key))
>>>>>                 if (col != null)
>>>>>                     usefulRow = true
>>>>>                 data = data :+ col
>>>>>             }
>>>>>
>>>>>             if (usefulRow)
>>>>>                 Some(data)
>>>>>             else
>>>>>                 None
>>>>>         })
>>>>>     }
>>>>>
>>>>>     def main(args: Array[String]) {
>>>>>         val hBaseRDD = init(args)
>>>>>         // hBaseRDD.cache()
>>>>>
>>>>>         println("**** Initial row count " + hBaseRDD.count())
>>>>>         println("**** Rows with interesting fields " +
>>>>> readFields(hBaseRDD).count())
>>>>>   }
>>>>>
>>>>>
>>>>> I am running on a one mode CDH installation.
>>>>>
>>>>> As it is it takes around 2.5 minutes. But if I comment out
>>>>> 'println("**** Initial row count " + hBaseRDD.count())', it takes around
>>>>> 1.5 minutes.
>>>>>
>>>>> Is it doing HBase scan twice, for both 'count' calls? How do I improve
>>>>> it?
>>>>>
>>>>> Thanks,
>>>>> -Soumitra.
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to