Found the issue, actually splits in HBase was not uniform, so one job was
taking 90% of time.

BTW, is there a way to save the details available port 4040 after job is
finished?


On Tue, Feb 25, 2014 at 7:26 AM, Nick Pentreath <nick.pentre...@gmail.com>wrote:

> It's tricky really since you may not know upfront how much data is in
> there. You could possibly take a look at how much data is in the HBase
> tables to get an idea.
>
> It may take a bit of trial and error, like running out of memory trying to
> cache the dataset, and checking the Spark UI on port 4040 to see how much
> is cached and how much memory still remains available, etc etc. You should
> also take a look at http://spark.apache.org/docs/latest/tuning.html for
> ideas around memory and serialization tuning.
>
> Broadly speaking, what you want to try to do is filter as much data as
> possible first and cache the subset of data on which you'll be performing
> multiple passes or computations. For example, based on your code above, you
> may in fact only wish to cache the data that is the "interesting" fields
> RDD. It all depends on what you're trying to achieve.
>
> If you will only be doing one pass through the data anyway (like running a
> count every time on the full dataset) then caching is not going to help you.
>
>
> On Tue, Feb 25, 2014 at 4:59 PM, Soumitra Kumar 
> <kumar.soumi...@gmail.com>wrote:
>
>> Thanks Nick.
>>
>> How do I figure out if the RDD fits in memory?
>>
>>
>> On Tue, Feb 25, 2014 at 1:04 AM, Nick Pentreath <nick.pentre...@gmail.com
>> > wrote:
>>
>>> cache only caches the data on the first action (count) - the first time
>>> it still needs to read the data from the source. So the first time you call
>>> count it will take the same amount of time whether cache is enabled or not.
>>> The second time you call count on a cached RDD, you should see that it
>>> takes a lot less time (assuming that the data fit in memory).
>>>
>>>
>>> On Tue, Feb 25, 2014 at 9:38 AM, Soumitra Kumar <
>>> kumar.soumi...@gmail.com> wrote:
>>>
>>>> I did try with 'hBaseRDD.cache()', but don't see any improvement.
>>>>
>>>> My expectation is that with cache enabled, there should not be any
>>>> penalty of 'hBaseRDD.count' call.
>>>>
>>>>
>>>>
>>>> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath <
>>>> nick.pentre...@gmail.com> wrote:
>>>>
>>>>> Yes, you''re initiating a scan for each count call. The normal way to
>>>>> improve this would be to use cache(), which is what you have in your
>>>>> commented out line:
>>>>> // hBaseRDD.cache()
>>>>>
>>>>> If you uncomment that line, you should see an improvement overall.
>>>>>
>>>>> If caching is not an option for some reason (maybe data is too large),
>>>>> then you can implement an overall count in your readFields method using
>>>>> accumulators:
>>>>>
>>>>> val count = sc.accumulator(0L)
>>>>> ...
>>>>> In your flatMap function do count += 1 for each row (regardless of
>>>>> whether "interesting" or not).
>>>>>
>>>>> In your main method after doing an action (e.g. count in your case),
>>>>> call val totalCount = count.value.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar <
>>>>> kumar.soumi...@gmail.com> wrote:
>>>>>
>>>>>> I have a code which reads an HBase table, and counts number of rows
>>>>>> containing a field.
>>>>>>
>>>>>>     def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) :
>>>>>> RDD[List[Array[Byte]]] = {
>>>>>>         return rdd.flatMap(kv => {
>>>>>>             // Set of interesting keys for this use case
>>>>>>             val keys = List ("src")
>>>>>>             var data = List[Array[Byte]]()
>>>>>>             var usefulRow = false
>>>>>>
>>>>>>             val cf = Bytes.toBytes ("cf")
>>>>>>             keys.foreach {key =>
>>>>>>                 val col = kv._2.getValue(cf, Bytes.toBytes(key))
>>>>>>                 if (col != null)
>>>>>>                     usefulRow = true
>>>>>>                 data = data :+ col
>>>>>>             }
>>>>>>
>>>>>>             if (usefulRow)
>>>>>>                 Some(data)
>>>>>>             else
>>>>>>                 None
>>>>>>         })
>>>>>>     }
>>>>>>
>>>>>>     def main(args: Array[String]) {
>>>>>>         val hBaseRDD = init(args)
>>>>>>         // hBaseRDD.cache()
>>>>>>
>>>>>>         println("**** Initial row count " + hBaseRDD.count())
>>>>>>         println("**** Rows with interesting fields " +
>>>>>> readFields(hBaseRDD).count())
>>>>>>   }
>>>>>>
>>>>>>
>>>>>> I am running on a one mode CDH installation.
>>>>>>
>>>>>> As it is it takes around 2.5 minutes. But if I comment out
>>>>>> 'println("**** Initial row count " + hBaseRDD.count())', it takes around
>>>>>> 1.5 minutes.
>>>>>>
>>>>>> Is it doing HBase scan twice, for both 'count' calls? How do I
>>>>>> improve it?
>>>>>>
>>>>>> Thanks,
>>>>>> -Soumitra.
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to