Hey Lucas, How many unique keys do you have when you do these aggregations? Also, when you look in the web UI, can you tell how much in-memory storage is being used overall by the events RDD and the casRDD?
- Patrick On Mon, Oct 28, 2013 at 1:21 PM, Lucas Fernandes Brunialti < [email protected]> wrote: > Hello, > > I count everts per date/time after that code, like the code below: > > JavaPairRDD<String, Integer> eventPerDate = events.map( > > new PairFunction<Tuple2<String, String>, String, > Integer>() { > > @Override > > public Tuple2<String, Integer> > call(Tuple2<String, String> tuple) throws Exception { > > return new Tuple2<String, > Integer>(tuple._1, 1); > > } > > }).reduceByKey(new Function2<Integer, Integer, > Integer>() { > > @Override > > public Integer call(Integer i1, Integer > i2) throws Exception { > > return i1 + i2; > > } > > } > > ); > > > Also, there are other jobs that run agains that RDD `events` cached. So, > the jobs that are taking about 4 minutes are the ones run against cached > (or partially cached) datasets, the 'agregator' job that cache the RDD > takes about 7 minutes. The most part of my log is, so the actions that are > taking time are the subsequential ones: > > > 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Starting task > 3.0:2062 as TID 6838 on executor 1: ip-10-8-19-185.ec2.internal > (PROCESS_LOCAL) > > 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Serialized task > 3.0:2062 as 2621 bytes in 0 ms > > 13/10/28 20:01:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo: > Added rdd_2_2243 in memory on > domU-12-31-39-03-26-31.compute-1.internal:35691 (size: 34.5 MB, free: > 1371.7 MB) > > 13/10/28 20:01:05 INFO storage.BlockManagerMasterActor$BlockManagerInfo: > Added rdd_2_2259 in memory on > domU-12-31-39-03-26-31.compute-1.internal:35691 (size: 6.4 MB, free: 1365.2 > MB) > > 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Finished TID 6818 in > 2545 ms on ip-10-4-231-4.ec2.internal (progress: 2220/2300) > > 13/10/28 20:01:05 INFO scheduler.DAGScheduler: Completed ShuffleMapTask(3, > 2123) > > 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Starting task > 3.0:2157 as TID 6839 on executor 3: ip-10-4-231-4.ec2.internal > (PROCESS_LOCAL) > > 13/10/28 20:01:05 INFO cluster.ClusterTaskSetManager: Serialized task > 3.0:2157 as 2619 bytes in 1 ms > > 13/10/28 20:01:06 INFO cluster.ClusterTaskSetManager: Finished TID 6782 in > 7985 ms on domU-12-31-39-0A-90-F2.compute-1.internal (progress: 2221/2300) > > > Thank you very much! > > > Lucas. > > > 2013/10/28 Patrick Wendell <[email protected]> > >> Hey Lucas, >> >> This code still needs to read the entire initial dataset from Cassandra, >> so that's probably what's taking most of the time. Also, it doesn't show >> here the operations you are actually doing. >> >> What happens when you look in the Spark web UI or the logs? Can you tell >> which stages are taking the most time? Is it the initial load or is it the >> subsequent actions? >> >> - Patrick >> >> >> On Mon, Oct 28, 2013 at 10:05 AM, Lucas Fernandes Brunialti < >> [email protected]> wrote: >> >>> Hi, >>> >>> Patrick, we cache the RDD before doing operations. We tried to cache the >>> cassRdd (flatMap) but doesn't help. The piece of code of one job (we're >>> using java, so the code is bigger) is in the end of the email. >>> >>> Ewen, yes, we're re-reading from Cassandra, I found that it is better >>> than serialize and read the blocks in/from disk, as my dataset doesn't fit >>> in memory... >>> >>> I also tried the kryoSerializer, also with no success. We're sharing the >>> events (RDD) for the other jobs. The parameters for the jobs are: >>> >>> System.setProperty("spark.storage.memoryFraction", "0.7"); >>> >>> System.setProperty("spark.executor.memory", "12g"); >>> >>> System.setProperty("spark.storage.StorageLevel", "MEMORY_ONLY"); >>> >>> >>> >>> Thanks! >>> >>> Lucas. >>> >>> >>> >>> @SuppressWarnings("unchecked") >>> >>> JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> casRdd = >>> context.newAPIHadoopRDD(job.getConfiguration(), >>> >>> ColumnFamilyInputFormat.class >>> .asSubclass(org.apache.hadoop.mapreduce.InputFormat.class), >>> >>> ByteBuffer.class, SortedMap.class >>> ).persist(StorageLevel.MEMORY_ONLY()); >>> >>> >>> JavaRDD<Tuple2<String, String>> events = casRdd.flatMap(new >>> FlatMapFunction<Tuple2<ByteBuffer, >>> SortedMap<ByteBuffer, IColumn>>, Tuple2<String, String>>() { >>> >>> @Override >>> >>> public Iterable<Tuple2<String, String>> call(Tuple2<ByteBuffer, >>> SortedMap<ByteBuffer, IColumn>> tuple) throws Exception { >>> >>> ArrayList<Tuple2<String, String>> events = new ArrayList<Tuple2<String, >>> String>>(); >>> >>> for (ByteBuffer columnName : tuple._2().keySet()) { >>> >>> String timestamp = ByteBufferUtil.string(columnName); >>> >>> String event = >>> ByteBufferUtil.string(tuple._2().get((ByteBufferUtil.bytes(timestamp))).value()); >>> >>> Calendar date = Calendar.getInstance(); >>> >>> date.setTimeInMillis(Long.parseLong(timestamp)); >>> >>> String key = String.valueOf(date.get(Calendar.DAY_OF_MONTH)) + "-" + >>> String.valueOf(date.get(Calendar.MONTH)); >>> >>> events.add(new Tuple2<String, String>(key, event)); >>> >>> } >>> >>> return events; >>> >>> } >>> >>> }); >>> >>> >>> events = events.filter(new Function<Tuple2<String,String>, Boolean>(){ >>> >>> @Override >>> >>> public Boolean call(Tuple2<String, String> tuple) throws Exception { >>> >>> return tuple._2.contains("ARTICLE"); >>> >>> } >>> >>> }).persist(StorageLevel.MEMORY_ONLY()); >>> >>> >>> //other operations >>> >>> >>> 2013/10/28 Ewen Cheslack-Postava <[email protected]> >>> >>>> Well, he did mention that not everything was staying in the cache, so >>>> even with an ongoing job they're probably be re-reading from Cassandra. It >>>> sounds to me like the first issue to address is why things are being >>>> evicted. >>>> >>>> -Ewen >>>> >>>> ----- >>>> Ewen Cheslack-Postava >>>> StraightUp | http://readstraightup.com >>>> [email protected] >>>> (201) 286-7785 >>>> >>>> >>>> On Mon, Oct 28, 2013 at 9:24 AM, Patrick Wendell <[email protected]>wrote: >>>> >>>>> Hey Lucas, >>>>> >>>>> Could you provide some rough psuedo-code for your job? One question >>>>> is: are you loading the data from cassandra every time you perform an >>>>> action, or do you cache() the dataset first? If you have a dataset that's >>>>> already in an RDD, it's very hard for me to imaging that filters and >>>>> aggregations could possibly take 4 minutes... should be more like seconds. >>>>> >>>>> - Patrick >>>>> >>>>> >>>>> On Mon, Oct 28, 2013 at 9:11 AM, Lucas Fernandes Brunialti < >>>>> [email protected]> wrote: >>>>> >>>>>> Hello, >>>>>> >>>>>> We're using Spark to run analytics and ML jobs against Cassandra. Our >>>>>> analytics jobs are simple (filters and counts) and we're trying to >>>>>> improve >>>>>> the performance, these jobs takes around 4 minutes querying 160Gb (size >>>>>> of >>>>>> our dataset). Also, we use 5 workers and 1 master, EC2 m1.xlarge with 8gb >>>>>> in jvm heap. >>>>>> >>>>>> We tried to increase the jvm heap to 12gb, but we had no gain in >>>>>> performance. We're using CACHE_ONLY (after some tests we've found it >>>>>> better), also it's not caching everything, just around 1000 of 2500 >>>>>> blocks. >>>>>> Maybe the cache is not impacting on performance, just the cassandra IO >>>>>> (?) >>>>>> >>>>>> I saw that people from ooyala can do analytics jobs in milliseconds ( >>>>>> http://www.youtube.com/watch?v=6kHlArorzvs), any advices? >>>>>> >>>>>> Appreciate the help! >>>>>> >>>>>> Lucas. >>>>>> >>>>>> -- >>>>>> >>>>>> Lucas Fernandes Brunialti >>>>>> >>>>>> *Dev/Ops Software Engineer* >>>>>> >>>>>> *+55 9 6512 4514* >>>>>> >>>>>> *[email protected]* <[email protected]> >>>>>> >>>>> >>>>> >>>> >>> >>> >>> -- >>> >>> Lucas Fernandes Brunialti >>> >>> *Dev/Ops Software Engineer* >>> >>> *+55 11 96512 4514* >>> >>> *[email protected]* <[email protected]> >>> >> >> > > >
