Thanks Evan and Mark for your answers.

As you mention, it’s always best to get things working before thinking about 
optimisation. And this is the case. Our solution is working, but is 
unacceptably slow.

I have identified two “issues” in our solution, for which I would need some 
help (there are probably more, but let’s start with those 2 ;) :

1) [MEMORY] I’m surprised by the amount of memory used when caching my data 
set. When I load a single CSV file of 1gb, caching it takes about 2.5 gb of RAM.
And that’s with the simple RDD[String] obtained by sc.textFile.
When I use my Record class to represent each record, it’s even worse.
What can I do about this? I guess I should use minimal structures to represent 
the records (maybe Strings are too heavy and I should use lighter types?)
As I said in my previous mail, I’d like to have more info/examples about how to 
create a “good” serializable class suitable for RDD (with or without Kryo).

2) [HDFS] Saving my results to HDFS using RDD.saveAsTextFile is REALLY slow. 
The RDDs I’m saving are just Pairs of (Customer_ID, value) (String, String). 
Should I use a Hadoop Sequence format ? How to make the storage efficient?

Thanks for your help!

Pierre

On 14 Feb 2014, at 17:41, Mark Hamstra <m...@clearstorydata.com> wrote:

> or prdd.filter { case (_, value) => f(value) }
> 
> 
> On Fri, Feb 14, 2014 at 8:30 AM, Evan R. Sparks <evan.spa...@gmail.com> wrote:
> See inline
> 
> 
> On Fri, Feb 14, 2014 at 5:57 AM, Pierre Borckmans 
> <pierre.borckm...@realimpactanalytics.com> wrote:
> 
> Hi everyone!
> 
> We are just starting a new project with Spark and have a few (newbie) 
> questions.
> Any help would be greatly appreciated!
> 
> * First a bit of context:
> 
> We are working on CSV files, about 400M lines, in which each line represents 
> a record (transaction) from customer_A to customer_B, with around 20 fields 
> (transaction duration, value, type, ...).
> We would like to use spark to extract several hundreds of simple variables 
> for each customer.
> 
> We have created a Record class to represent the records, and ease their 
> manipulation (filtering on fields, manipulating dates, ...)
>         val f = sc.textFile("hdfs://data.csv")
>         val data: RDD[Record] = f.map(Record(_))
> where Record.apply(String) is splitting the csv strings and filling the 
> fields accordingly
> 
> Now given a set of (field, value), we want to compute a variable for each 
> customer_A, on records that respect the set of fields and values:
> 
>         // Filter the initial RDD on the given fields and values
>         val filteredData = data.filter(_.check(field1, 
> value1)).filter(_.check(field2, value2)).filter(...)
>         // Since we will want to filter later on other combinations of fields 
> and values,
>         // we would like to persist these rdd in memory. For instance:
>         val filteredData2 = data.filter(_.check(field1, 
> value1)).filter(_.check(field3, value3))
>         // should benefit from the RDD already filtered on field1, value1
>         // where should we put the cache? in between each filter / at the 
> beginning / at the end ?
> 
> You should put your .cache() after the creation of the RDD you plan to 
> re-use. Cache the data you're going to use over and over again. 
> 
> 
>         // Compute the variable of interest, grouped by customer
>         // The formulation is generic, using functions f, g and h
>         // The output is always a rdd of pairs (customerA, variable)
>         val variable = filteredData.map( x => ( x.customerA, f( x ) ) 
> ).reduceByKey( g( _, _ ) ).map( h( _ ) ) 
> 
>         // For example, if we want to compute the number of records for each 
> customer_A with field1=value1, we define:
>         def f(x) = 1
>         def g(x,y) = x+y
>         def h(x,y) = (x,y)
> 
>         // Another example, if we want to compute the number of distinct 
> customer_B are involved in records with customer_A, we define:
>         def f(x) = x.customerB
>         def g(x,y) =  x ++ y
>         def h(x,y) = ( x, y.size )
> 
> 
> * Now, my questions ;)
> 
> - SERIALIZATION:
>         - Is it OK to manipulate RDDs of Record objects or should we stick 
> with simple RDDs of strings, and do all the splitting and computation in each 
> transformation ?
>         How to make that efficient in terms of memory and speed?
>         I've read the docs about the Tuning (and Kryo serialization) but I'd 
> like to have more info on that...
> RDD of Record is OK as long as Record is serializable. Kryo may help you here 
> but I'm not sure how you're representing Record so I'm not sure how much. 
> Either way, it's usually best to get it working first and then worry about 
> serialization/performance later. 
> 
> - PERFORMANCE:
>         - Is it a good idea to perform all the filters first, and then the 
> groupBy customer, or should we do the reverse?
> Yes, performing filters first is a good idea. Spark doesn't do heavy (SQL 
> style) query optimization where filters are pushed down early in the DAG, 
> etc. If your queries look a lot like database queries, you might think about 
> using Shark for this - it'll save you some pain at setup/serialization and 
> may be much faster if your queries only operate on a few columns at a time. 
>         In the second situation, how can we filter on the values? I didn't 
> see a filterValues method in the PairRDD API ?
> You can do a .filter() on the value of a pairRDD like this
> 
> prdd.filter(_._2 == "something") 
> //or
> prdd.filter(r => f(r._2))
> 
> 
> Thanks for any help you can bring us!
> 
> 
> Pierre
> 
> 
> 

Reply via email to