Hi,
Patrick, we cache the RDD before doing operations. We tried to cache the
cassRdd (flatMap) but doesn't help. The piece of code of one job (we're
using java, so the code is bigger) is in the end of the email.
Ewen, yes, we're re-reading from Cassandra, I found that it is better than
serialize and read the blocks in/from disk, as my dataset doesn't fit in
memory...
I also tried the kryoSerializer, also with no success. We're sharing the
events (RDD) for the other jobs. The parameters for the jobs are:
System.setProperty("spark.storage.memoryFraction", "0.7");
System.setProperty("spark.executor.memory", "12g");
System.setProperty("spark.storage.StorageLevel", "MEMORY_ONLY");
Thanks!
Lucas.
@SuppressWarnings("unchecked")
JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> casRdd =
context.newAPIHadoopRDD(job.getConfiguration(),
ColumnFamilyInputFormat.class
.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
ByteBuffer.class, SortedMap.class).persist(StorageLevel.MEMORY_ONLY());
JavaRDD<Tuple2<String, String>> events = casRdd.flatMap(new
FlatMapFunction<Tuple2<ByteBuffer,
SortedMap<ByteBuffer, IColumn>>, Tuple2<String, String>>() {
@Override
public Iterable<Tuple2<String, String>> call(Tuple2<ByteBuffer,
SortedMap<ByteBuffer, IColumn>> tuple) throws Exception {
ArrayList<Tuple2<String, String>> events = new ArrayList<Tuple2<String,
String>>();
for (ByteBuffer columnName : tuple._2().keySet()) {
String timestamp = ByteBufferUtil.string(columnName);
String event =
ByteBufferUtil.string(tuple._2().get((ByteBufferUtil.bytes(timestamp))).value());
Calendar date = Calendar.getInstance();
date.setTimeInMillis(Long.parseLong(timestamp));
String key = String.valueOf(date.get(Calendar.DAY_OF_MONTH)) + "-" +
String.valueOf(date.get(Calendar.MONTH));
events.add(new Tuple2<String, String>(key, event));
}
return events;
}
});
events = events.filter(new Function<Tuple2<String,String>, Boolean>() {
@Override
public Boolean call(Tuple2<String, String> tuple) throws Exception {
return tuple._2.contains("ARTICLE");
}
}).persist(StorageLevel.MEMORY_ONLY());
//other operations
2013/10/28 Ewen Cheslack-Postava <[email protected]>
> Well, he did mention that not everything was staying in the cache, so even
> with an ongoing job they're probably be re-reading from Cassandra. It
> sounds to me like the first issue to address is why things are being
> evicted.
>
> -Ewen
>
> -----
> Ewen Cheslack-Postava
> StraightUp | http://readstraightup.com
> [email protected]
> (201) 286-7785
>
>
> On Mon, Oct 28, 2013 at 9:24 AM, Patrick Wendell <[email protected]>wrote:
>
>> Hey Lucas,
>>
>> Could you provide some rough psuedo-code for your job? One question is:
>> are you loading the data from cassandra every time you perform an action,
>> or do you cache() the dataset first? If you have a dataset that's already
>> in an RDD, it's very hard for me to imaging that filters and aggregations
>> could possibly take 4 minutes... should be more like seconds.
>>
>> - Patrick
>>
>>
>> On Mon, Oct 28, 2013 at 9:11 AM, Lucas Fernandes Brunialti <
>> [email protected]> wrote:
>>
>>> Hello,
>>>
>>> We're using Spark to run analytics and ML jobs against Cassandra. Our
>>> analytics jobs are simple (filters and counts) and we're trying to improve
>>> the performance, these jobs takes around 4 minutes querying 160Gb (size of
>>> our dataset). Also, we use 5 workers and 1 master, EC2 m1.xlarge with 8gb
>>> in jvm heap.
>>>
>>> We tried to increase the jvm heap to 12gb, but we had no gain in
>>> performance. We're using CACHE_ONLY (after some tests we've found it
>>> better), also it's not caching everything, just around 1000 of 2500 blocks.
>>> Maybe the cache is not impacting on performance, just the cassandra IO (?)
>>>
>>> I saw that people from ooyala can do analytics jobs in milliseconds (
>>> http://www.youtube.com/watch?v=6kHlArorzvs), any advices?
>>>
>>> Appreciate the help!
>>>
>>> Lucas.
>>>
>>> --
>>>
>>> Lucas Fernandes Brunialti
>>>
>>> *Dev/Ops Software Engineer*
>>>
>>> *+55 9 6512 4514*
>>>
>>> *[email protected]* <[email protected]>
>>>
>>
>>
>
--
Lucas Fernandes Brunialti
*Dev/Ops Software Engineer*
*+55 11 96512 4514*
*[email protected]* <[email protected]>