Github user srowen commented on a diff in the pull request:
https://github.com/apache/spark/pull/20396#discussion_r163941171
--- Diff:
mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala
---
@@ -421,13 +460,220 @@ private[evaluation] object
SquaredEuclideanSilhouette {
computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _:
Double, _: Double)
}
- val silhouetteScore = dfWithSquaredNorm
- .select(avg(
- computeSilhouetteCoefficientUDF(
- col(featuresCol), col(predictionCol).cast(DoubleType),
col("squaredNorm"))
- ))
- .collect()(0)
- .getDouble(0)
+ val silhouetteScore = overallScore(dfWithSquaredNorm,
+ computeSilhouetteCoefficientUDF(col(featuresCol),
col(predictionCol).cast(DoubleType),
+ col("squaredNorm")))
+
+ bClustersStatsMap.destroy()
+
+ silhouetteScore
+ }
+}
+
+
+/**
+ * The algorithm which is implemented in this object, instead, is an
efficient and parallel
+ * implementation of the Silhouette using the cosine distance measure. The
cosine distance
+ * measure is defined as `1 - s` where `s` is the cosine similarity
between two points.
+ *
+ * The total distance of the point `X` to the points `$C_{i}$` belonging
to the cluster `$\Gamma$`
+ * is:
+ *
+ * <blockquote>
+ * $$
+ * \sum\limits_{i=1}^N d(X, C_{i} ) =
+ * \sum\limits_{i=1}^N \Big( 1 - \frac{\sum\limits_{j=1}^D x_{j}c_{ij}
}{ \|X\|\|C_{i}\|} \Big)
+ * = \sum\limits_{i=1}^N 1 - \sum\limits_{i=1}^N \sum\limits_{j=1}^D
\frac{x_{j}}{\|X\|}
+ * \frac{c_{ij}}{\|C_{i}\|}
+ * = N - \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} \Big(
\sum\limits_{i=1}^N
+ * \frac{c_{ij}}{\|C_{i}\|} \Big)
+ * $$
+ * </blockquote>
+ *
+ * where `$x_{j}$` is the `j`-th dimension of the point `X` and `$c_{ij}$`
is the `j`-th dimension
+ * of the `i`-th point in cluster `$\Gamma$`.
+ *
+ * Then, we can define the vector:
+ *
+ * <blockquote>
+ * $$
+ * \xi_{X} : \xi_{X i} = \frac{x_{i}}{\|X\|}, i = 1, ..., D
+ * $$
+ * </blockquote>
+ *
+ * which can be precomputed for each point and the vector
+ *
+ * <blockquote>
+ * $$
+ * \Omega_{\Gamma} : \Omega_{\Gamma i} = \sum\limits_{j=1}^N
\xi_{C_{j}i}, i = 1, ..., D
+ * $$
+ * </blockquote>
+ *
+ * which can be precomputed too for each cluster `$\Gamma$` by its points
`$C_{i}$`.
+ *
+ * With these definitions, the numerator becomes:
+ *
+ * <blockquote>
+ * $$
+ * N - \sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}
+ * $$
+ * </blockquote>
+ *
+ * Thus the average distance of a point `X` to the points of the cluster
`$\Gamma$` is:
+ *
+ * <blockquote>
+ * $$
+ * 1 - \frac{\sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}}{N}
+ * $$
+ * </blockquote>
+ *
+ * In the implementation, the precomputed values for the clusters are
distributed among the worker
+ * nodes via broadcasted variables, because we can assume that the
clusters are limited in number.
+ *
+ * The main strengths of this algorithm are the low computational
complexity and the intrinsic
+ * parallelism. The precomputed information for each point and for each
cluster can be computed
+ * with a computational complexity which is `O(N/W)`, where `N` is the
number of points in the
+ * dataset and `W` is the number of worker nodes. After that, every point
can be analyzed
+ * independently from the others.
+ *
+ * For every point we need to compute the average distance to all the
clusters. Since the formula
+ * above requires `O(D)` operations, this phase has a computational
complexity which is
+ * `O(C*D*N/W)` where `C` is the number of clusters (which we assume quite
low), `D` is the number
+ * of dimensions, `N` is the number of points in the dataset and `W` is
the number of worker
+ * nodes.
+ */
+private[evaluation] object CosineSilhouette extends Silhouette {
+
+ private[this] var kryoRegistrationPerformed: Boolean = false
+
+ private[this] val normalizedFeaturesColName = "normalizedFeatures"
+
+ /**
+ * This method registers the class
+ * [[org.apache.spark.ml.evaluation.CosineSilhouette.ClusterStats]]
+ * for kryo serialization.
+ *
+ * @param sc `SparkContext` to be used
+ */
+ def registerKryoClasses(sc: SparkContext): Unit = {
+ if (!kryoRegistrationPerformed) {
+ sc.getConf.registerKryoClasses(
+ Array(
+ classOf[CosineSilhouette.ClusterStats]
+ )
+ )
+ kryoRegistrationPerformed = true
+ }
+ }
+
+ case class ClusterStats(normalizedFeatureSum: Vector, numOfPoints: Long)
+
+ /**
+ * The method takes the input dataset and computes the aggregated values
+ * about a cluster which are needed by the algorithm.
+ *
+ * @param df The DataFrame which contains the input data
+ * @param predictionCol The name of the column which contains the
predicted cluster id
+ * for the point.
+ * @return A [[scala.collection.immutable.Map]] which associates each
cluster id to a
+ * [[ClusterStats]] object (which contains the precomputed
values `N` and
+ * `$\Omega_{\Gamma}$`).
+ */
+ def computeClusterStats(df: DataFrame, predictionCol: String):
Map[Double, ClusterStats] = {
+ val numFeatures =
df.select(col(normalizedFeaturesColName)).first().getAs[Vector](0).size
+ val clustersStatsRDD = df.select(
+ col(predictionCol).cast(DoubleType), col(normalizedFeaturesColName))
+ .rdd
+ .map { row => (row.getDouble(0), row.getAs[Vector](1)) }
+ .aggregateByKey[(DenseVector,
Long)]((Vectors.zeros(numFeatures).toDense, 0L))(
+ seqOp = {
+ case ((normalizedFeaturesSum: DenseVector, numOfPoints: Long),
(normalizedFeatures)) =>
+ BLAS.axpy(1.0, normalizedFeatures, normalizedFeaturesSum)
+ (normalizedFeaturesSum, numOfPoints + 1)
+ },
+ combOp = {
+ case ((normalizedFeaturesSum1, numOfPoints1),
(normalizedFeaturesSum2, numOfPoints2)) =>
+ BLAS.axpy(1.0, normalizedFeaturesSum2, normalizedFeaturesSum1)
+ (normalizedFeaturesSum1, numOfPoints1 + numOfPoints2)
+ }
+ )
+
+ clustersStatsRDD
+ .collectAsMap()
+ .mapValues {
--- End diff --
What about `mapValues` on the RDD before collecting or does that not work?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]