Found the issue, actually splits in HBase was not uniform, so one job was taking 90% of time.
BTW, is there a way to save the details available port 4040 after job is finished? On Tue, Feb 25, 2014 at 7:26 AM, Nick Pentreath <nick.pentre...@gmail.com>wrote: > It's tricky really since you may not know upfront how much data is in > there. You could possibly take a look at how much data is in the HBase > tables to get an idea. > > It may take a bit of trial and error, like running out of memory trying to > cache the dataset, and checking the Spark UI on port 4040 to see how much > is cached and how much memory still remains available, etc etc. You should > also take a look at http://spark.apache.org/docs/latest/tuning.html for > ideas around memory and serialization tuning. > > Broadly speaking, what you want to try to do is filter as much data as > possible first and cache the subset of data on which you'll be performing > multiple passes or computations. For example, based on your code above, you > may in fact only wish to cache the data that is the "interesting" fields > RDD. It all depends on what you're trying to achieve. > > If you will only be doing one pass through the data anyway (like running a > count every time on the full dataset) then caching is not going to help you. > > > On Tue, Feb 25, 2014 at 4:59 PM, Soumitra Kumar > <kumar.soumi...@gmail.com>wrote: > >> Thanks Nick. >> >> How do I figure out if the RDD fits in memory? >> >> >> On Tue, Feb 25, 2014 at 1:04 AM, Nick Pentreath <nick.pentre...@gmail.com >> > wrote: >> >>> cache only caches the data on the first action (count) - the first time >>> it still needs to read the data from the source. So the first time you call >>> count it will take the same amount of time whether cache is enabled or not. >>> The second time you call count on a cached RDD, you should see that it >>> takes a lot less time (assuming that the data fit in memory). >>> >>> >>> On Tue, Feb 25, 2014 at 9:38 AM, Soumitra Kumar < >>> kumar.soumi...@gmail.com> wrote: >>> >>>> I did try with 'hBaseRDD.cache()', but don't see any improvement. >>>> >>>> My expectation is that with cache enabled, there should not be any >>>> penalty of 'hBaseRDD.count' call. >>>> >>>> >>>> >>>> On Mon, Feb 24, 2014 at 11:29 PM, Nick Pentreath < >>>> nick.pentre...@gmail.com> wrote: >>>> >>>>> Yes, you''re initiating a scan for each count call. The normal way to >>>>> improve this would be to use cache(), which is what you have in your >>>>> commented out line: >>>>> // hBaseRDD.cache() >>>>> >>>>> If you uncomment that line, you should see an improvement overall. >>>>> >>>>> If caching is not an option for some reason (maybe data is too large), >>>>> then you can implement an overall count in your readFields method using >>>>> accumulators: >>>>> >>>>> val count = sc.accumulator(0L) >>>>> ... >>>>> In your flatMap function do count += 1 for each row (regardless of >>>>> whether "interesting" or not). >>>>> >>>>> In your main method after doing an action (e.g. count in your case), >>>>> call val totalCount = count.value. >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, Feb 25, 2014 at 9:15 AM, Soumitra Kumar < >>>>> kumar.soumi...@gmail.com> wrote: >>>>> >>>>>> I have a code which reads an HBase table, and counts number of rows >>>>>> containing a field. >>>>>> >>>>>> def readFields(rdd : RDD[(ImmutableBytesWritable, Result)]) : >>>>>> RDD[List[Array[Byte]]] = { >>>>>> return rdd.flatMap(kv => { >>>>>> // Set of interesting keys for this use case >>>>>> val keys = List ("src") >>>>>> var data = List[Array[Byte]]() >>>>>> var usefulRow = false >>>>>> >>>>>> val cf = Bytes.toBytes ("cf") >>>>>> keys.foreach {key => >>>>>> val col = kv._2.getValue(cf, Bytes.toBytes(key)) >>>>>> if (col != null) >>>>>> usefulRow = true >>>>>> data = data :+ col >>>>>> } >>>>>> >>>>>> if (usefulRow) >>>>>> Some(data) >>>>>> else >>>>>> None >>>>>> }) >>>>>> } >>>>>> >>>>>> def main(args: Array[String]) { >>>>>> val hBaseRDD = init(args) >>>>>> // hBaseRDD.cache() >>>>>> >>>>>> println("**** Initial row count " + hBaseRDD.count()) >>>>>> println("**** Rows with interesting fields " + >>>>>> readFields(hBaseRDD).count()) >>>>>> } >>>>>> >>>>>> >>>>>> I am running on a one mode CDH installation. >>>>>> >>>>>> As it is it takes around 2.5 minutes. But if I comment out >>>>>> 'println("**** Initial row count " + hBaseRDD.count())', it takes around >>>>>> 1.5 minutes. >>>>>> >>>>>> Is it doing HBase scan twice, for both 'count' calls? How do I >>>>>> improve it? >>>>>> >>>>>> Thanks, >>>>>> -Soumitra. >>>>>> >>>>> >>>>> >>>> >>> >> >