Github user ConeyLiu commented on the issue:
https://github.com/apache/spark/pull/19317
@jiangxb1987 ,@WeichenXu123, thanks for your reviewing. This change is
inspired by the `TODO List`. You can see the follow code snippet:
```scala
// TODO: Calling aggregateByKey and collect creates two stages, we can
implement something
// TODO: similar to reduceByKeyLocally to save one stage.
val aggregated = dataset.select(col($(labelCol)), w,
col($(featuresCol))).rdd
.map { row => (row.getDouble(0), (row.getDouble(1),
row.getAs[Vector](2)))
}.aggregateByKey[(Double, DenseVector)]((0.0,
Vectors.zeros(numFeatures).toDense))(
seqOp = {
case ((weightSum: Double, featureSum: DenseVector), (weight,
features)) =>
requireValues(features)
BLAS.axpy(weight, features, featureSum)
(weightSum + weight, featureSum)
},
combOp = {
case ((weightSum1, featureSum1), (weightSum2, featureSum2)) =>
BLAS.axpy(1.0, featureSum2, featureSum1)
(weightSum1 + weightSum2, featureSum1)
}).collect().sortBy(_._1)
```
- The code `aggregateByKeyLocally` we implemented is similar to the
`reduceByKeyLocally `.
- I agree with your suggestion for using `OpenHashSet ` instead of
`JHashMap`. I could change it, and also the `reduceByKeyLocally` maybe need a
change to.
- Because here we collect all the aggregated data to driver and sort it. I
think the data could be small. And the collected data of two implements could
be almost equally.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]