First of all, thank you very much, it is a vibrant community.

Sorry for being so frugal in my question.


The dataset is going to be Bigger, we are processing the Mexican Economic
Census Data, and some of the data sets are around millions of records, may
be it is not enough to call it Big Data but it is big enough to R and other
traditional statistical software.


I´m in the National Statistical Office of  México and we are doing a proof
of concept with Spark to see if it is a good solution for this kind of
problems in my institution, more close to statistical analysis as in R than
data processing as in Map-Reduce family. I think Spark is going to mature
in the near future in the mlib direction, with more support to vectorized
operations “like” R, thinking in spark as the R for BigData.


Ok, thank you Imran let me do some comments point by point

1)      1) Yes you are right, we are getting a feel of spark with a small
data set. Once the spark implementation its stable enough we are going to
increment the size of the data set and the size of the cluster. But 30 mins
is a bad smell about my implementation of the algorithm.

2)      2) Yes I´m trying to understand the Spark way , in R everything is
a vector and of course anything is clusterized, I see R as a mean to
communicate algorithms and ideas with the statistical research area, they
are strong in R and in statistics.

Next, the meaning of the R variables are:

x  # vector with 283,469 observations (Double) from which the density
estimation will be made

x2 # vector of 512 points where the density will be estimated (Double)

w # vector with 283,469 weights one per each observation  (Double)

h # the bandwidth 1 number (Double)



in Scala

id_x  // org.apache.spark.rdd.RDD[(java.lang.String, Double)] with 283,469
observations. Where the ugly String is the unconverted consecutive Integer
that helps me to join the vectors id_x and id_w.

x2 // 512 values in scala.collection.immutable.NumericRange[Double], where
the density will be estimated

id_w // the 283,469 weights  with the ugly String Id, corresponding One to
One with the id in id_x.

h // One Double for the bandwidth



As you can see there are some places in the scala algorithm were is nedded
to do element by element operations like in the :

        val z = id_x.mapValues(x=>(x2j-x)/h)



where z is a new vector with the arithmetic transformation element by
element of the id_x vector that still have the string id. Maybe as Evan
says in this place we can manually split the vector in JBlas subvectors,
and run vectorized operations to avoid the implicit iteration element by
element, but in this moment I don’t realize how to do that.

The other place where the element by element operations are doing is here:

        z.mapValues(gau(_)).join(id_w).map(x=>(x._2._1*x._2._2)).sum



Here happens the most interesting thing in terms of parallelism and where
the optimizations could be done, but I don’t know how exactly, the idea
there is that its needed an element by element operation between two big
vectors in R its simple :

        sum(w*gau(z))



If I understand well, it’s just what Evan recommends via the ZippedRDD. And
let me use the vectors directly with out the ugly string id.



3)      3) Thanks for the idea

4)      4) May be we are close to finish the Scala-Spark implementation and
its not necessary to do a pure Scala implementation.

5)      5) Thank you, we are to improve de R code.



And finally Evan, could you give some hints about how to pack the vectors
in jblas DoubleMatrix, I mean how it is possible control the partitioning.
I appreciate a lot any direction around that.

Thanks Again to the Spark community !!!


On Wed, Dec 11, 2013 at 5:24 PM, Evan R. Sparks <[email protected]>wrote:

> Agreed with Imran - without knowing the size/shape of the objects in your
> program, it's tough to tell where the bottleneck is. Additionally, unless
> the problem is really big, (in terms of your w and x vectors), it's
> unlikely that you're going to be CPU bound on the cluster - communication
> and stragglers become the dominant effect here.
>
> I also agree that the join is causing a big bottleneck.
>
> Even after eliminating the join and with really big w and x vectors,
> though, you may still see spark run "slow" compared to your R program. This
> may be because many of the operations you use in R are vectorized, and with
> the spark code, you instead seem to be making your vectors a collection of
> (offset,value) pairs.
>
> There are several alternative strategies here:
> 1) Instead of a join, use a zip() if the two RDDs are the same size and
> are paritioned identically. This will be much faster than a join.
>  2) An alternative strategy that could allow you to get vectorized ops in
> spark while still operating in a distributed fashion is to pack your
> partitions into something like a jblas DoubleMatrix, one per partition,
> where each partition is a contiguous block of elements. Then you can apply
> your gaussian transformation and elementwise multiply, etc. using jblas,
> which links in blas - a low-level linear algebra library (not unlike what R
> is using under the covers) for speed.
>
> Again - with 2 you could still do it as a zip.
>
> For examples of using jblas with spark code, take a look at the mllib
> codebase in the spark repository on git.
>
> - Evan
>
>
> On Wed, Dec 11, 2013 at 4:51 PM, Imran Rashid <[email protected]>wrote:
>
>> these are just thoughts off the top of my head:
>>
>> 1) if the original R code runs in 3 secs, you are unlikely to be able to
>> improve that drastically with spark.  Yes, spark can run sub-second jobs,
>> but no matter what, don't expect spark to get you into the < 10 millisecond
>> range.  While spark has much lower overhead than hadoop at running jobs,
>> there is definitely still significant overhead.  Its really meant to speed
>> up bigger computations.  (Still, I understand the desire to start w/
>> something small to get a feel for it.)
>>
>> 2) Spark is doing a *lot* more work the way you have it coded now.  I'm
>> not entirely sure what is going on b/c I can't see what all the variables
>> mean.  But at the very least, the spark code is dealing w/ different data,
>> since I see Strings are involved in the spark code.
>>
>> In the R code, can you tell me the meaning of & size of:
>> x
>> x2
>> w
>> h
>>
>> and in the scala code, similarly for:
>> x2
>> id_x
>> id_w
>> h
>>
>> The join() in spark is the main thing that is really slow -- b/c its
>> designed for big data sets, it involves serializing data to a file and
>> reading it back.  That is almost certainly what is taking a long time.  And
>> you are doing one join per element in x.  Again, without knowing the
>> meaning of each of those variables, hard for me to say another way of doing
>> it ... given that the R code doesn't have any strings or joins involved,
>> you probably don't need them in spark.
>>
>> 3) If the problem really is small, maybe it doesn't make sense to use
>> spark (related to #1).  Given the overhead associated w/ each task, you
>> want them to be non-trivial amounts of work.  Eg., maybe a better example
>> of using spark to speed this example up would be doing lots of kernel
>> density estimates w/ the different bandwidths, and then choosing the best
>> one.  pseudo-code:
>>
>> val data : Array[Double] = ...
>> val bcData = sc.broadcast(data)
>> val bandwidths = ... //eg (0 to 1000).map{_ / 1000.0}
>> def computeEstimateAndGetError(data: Array[Double], bandwidth:Double):
>> Double = {
>>   // totally normal scala code goes here, not involving spark at all.
>> compute the density estimate on part of the data, get the error on the rest
>> of the data, and return the error
>>   // maybe even do it on a bunch of different samples of data, eg. w/
>> cross-validation
>>   ...
>>   // just return the error
>> }
>>
>> //now use spark to search all bandwidths
>> val (bestBandwidth, bestError) = sc.parallelize(bandwidths).map{
>> bandwidth =>
>>   bandwidth -> computeEstimateAndGetError(bcData.value, bandwidth)
>> }.collect.minBy{_._2}
>>
>> (still, maybe not a great example, since you could do something smarter
>> than just a grid search, but hopefully that gives you an idea)
>>
>> 4) kind of a follow on to #3, but your first step might be to just write
>> some normal scala code to really compute the exact same thing as R for
>> comparison.
>>
>> 5) The original R code can be improved: You are copying ker a lot,
>> because you keep appending only one element to it.  better to allocate it
>> as the right size in the first place
>> ker <- numeric(length(x2))
>> (depending on the size of x2, this could be a major bottleneck, as it
>> requires length(x2)^2 operations)
>>
>>
>>
>>
>> On Wed, Dec 11, 2013 at 3:26 PM, ABEL ALEJANDRO CORONADO IRUEGAS <
>> [email protected]> wrote:
>>
>>> We are trying to implement a Univariate Kernel Density Estimation in
>>> Spark, our first step was implement it in R from Scratch, here the relevant
>>> code:
>>>
>>> ## R CODE
>>> ## gaussian kernel
>>> gau<-function(z)
>>> {
>>>     k<- exp(-(z^2)/2)*(1/sqrt(2*pi))
>>>     k
>>> }
>>>
>>> ker<-c()
>>> for ( j in 1:length(x2))
>>> {  z<-(x2[j]-x)/h
>>>    ker[j]<-sum(w*gau(z))/(sum(w)*h)
>>> }
>>>
>>> Then we implement similar ideas in the Spark Shell
>>>
>>> // SCALA-SPARK CODE
>>>
>>> def gau(z:Double)={
>>>     scala.math.exp(-scala.math.pow(z,2)/2)*(1/scala.math.sqrt(2*
>>> scala.math.Pi))
>>> }
>>>
>>> def kernel(x2j:Double,id_x:RDD[(java.lang.String, Double)] ,id_w:
>>> RDD[(java.lang.String, Double)] ,h:Double) = {
>>>     val z = id_x.mapValues(x=>(x2j-x)/h)
>>>
>>> z.mapValues(gau(_)).join(id_w).map(x=>(x._2._1*x._2._2)).sum/(id_w.map(x=>x._2).sum
>>> * h)
>>> }
>>>
>>> val ker = x2.map(kernel(_,id_x,id_w,h))
>>>
>>> The problem is that R is faster (3 sec) than Spark (30 min) in the same
>>> machine (Quad Core), I´m sure is because we are not using Spark as well as
>>> its possible.
>>>
>>> Can any help us
>>>
>>> Abel Coronado
>>> @abxda
>>>
>>
>>
>

Reply via email to