Hi all,
I implemented the TRON algorithm for logistic regression from Liblinear in
Spark. The method is a Newton method, so it converges in a relatively
small number of iterations which are only 2x the cost of a gradient
computation. The distributed portion of the algorithm is a Hessian-vector
product, which can be computed as X^T (D (Xv) ), where X is the data matrix
and D is diagonal. I've written this in the following way:
//sigs contain the D scalars. A variable "data" is in scope which is an
//RDD of a container class for a datapoint.
def getLossHessianVec(sigs:RDD[Double], v: Array[Double]):
Array[Double] = {
val vB = sc.broadcast(v)
val accums = data.zipPartitions(sigs)( (dataIt, sigIt) => {
val accum = Array.fill(vB.value.length)(0.0)
dataIt.zip(sigIt).foreach{ case (numData, sig) => {
//do a dot product x_i dot w, scale by sig
//then accumulate each data example into accum,
//scaling by the previous line
})
}}
Array(accum).iterator
}).cache
accums.count
accums.reduce((x,y) => {(0 until x.length).foreach(ii =>
x(ii)+=y(ii)); x})
}
Basically, the computation up to the count (2nd-to-last line) runs like I
would expect---a few 100 ms for a moderate dataset. However, the last line
generally takes 4-5 times longer, even with a small number of workers 2 or
4 workers, and with 200 workers, the typical wall clock time was around 10
seconds for the reduce. The accums are moderate-sized Arrays, like
3,000,000 Doubles. This isn't an issue of scaling as much as it is one of
wall clock time, since a worker wouldn't be able to hold enough data to
amortize the cost of the reduces.
I originally had the code without the count statement---adding it made it
run faster. The reduction is ugly, but it it was as fast as I could find.
Any suggestions?
Thanks,
Tom