Github user mgaido91 commented on a diff in the pull request:
https://github.com/apache/spark/pull/20396#discussion_r164059420
--- Diff:
mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala
---
@@ -421,13 +460,220 @@ private[evaluation] object
SquaredEuclideanSilhouette {
computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _:
Double, _: Double)
}
- val silhouetteScore = dfWithSquaredNorm
- .select(avg(
- computeSilhouetteCoefficientUDF(
- col(featuresCol), col(predictionCol).cast(DoubleType),
col("squaredNorm"))
- ))
- .collect()(0)
- .getDouble(0)
+ val silhouetteScore = overallScore(dfWithSquaredNorm,
+ computeSilhouetteCoefficientUDF(col(featuresCol),
col(predictionCol).cast(DoubleType),
+ col("squaredNorm")))
+
+ bClustersStatsMap.destroy()
+
+ silhouetteScore
+ }
+}
+
+
+/**
+ * The algorithm which is implemented in this object, instead, is an
efficient and parallel
+ * implementation of the Silhouette using the cosine distance measure. The
cosine distance
+ * measure is defined as `1 - s` where `s` is the cosine similarity
between two points.
+ *
+ * The total distance of the point `X` to the points `$C_{i}$` belonging
to the cluster `$\Gamma$`
+ * is:
+ *
+ * <blockquote>
+ * $$
+ * \sum\limits_{i=1}^N d(X, C_{i} ) =
+ * \sum\limits_{i=1}^N \Big( 1 - \frac{\sum\limits_{j=1}^D x_{j}c_{ij}
}{ \|X\|\|C_{i}\|} \Big)
+ * = \sum\limits_{i=1}^N 1 - \sum\limits_{i=1}^N \sum\limits_{j=1}^D
\frac{x_{j}}{\|X\|}
+ * \frac{c_{ij}}{\|C_{i}\|}
+ * = N - \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} \Big(
\sum\limits_{i=1}^N
+ * \frac{c_{ij}}{\|C_{i}\|} \Big)
+ * $$
+ * </blockquote>
+ *
+ * where `$x_{j}$` is the `j`-th dimension of the point `X` and `$c_{ij}$`
is the `j`-th dimension
+ * of the `i`-th point in cluster `$\Gamma$`.
+ *
+ * Then, we can define the vector:
+ *
+ * <blockquote>
+ * $$
+ * \xi_{X} : \xi_{X i} = \frac{x_{i}}{\|X\|}, i = 1, ..., D
+ * $$
+ * </blockquote>
+ *
+ * which can be precomputed for each point and the vector
+ *
+ * <blockquote>
+ * $$
+ * \Omega_{\Gamma} : \Omega_{\Gamma i} = \sum\limits_{j=1}^N
\xi_{C_{j}i}, i = 1, ..., D
+ * $$
+ * </blockquote>
+ *
+ * which can be precomputed too for each cluster `$\Gamma$` by its points
`$C_{i}$`.
+ *
+ * With these definitions, the numerator becomes:
+ *
+ * <blockquote>
+ * $$
+ * N - \sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}
+ * $$
+ * </blockquote>
+ *
+ * Thus the average distance of a point `X` to the points of the cluster
`$\Gamma$` is:
+ *
+ * <blockquote>
+ * $$
+ * 1 - \frac{\sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}}{N}
+ * $$
+ * </blockquote>
+ *
+ * In the implementation, the precomputed values for the clusters are
distributed among the worker
+ * nodes via broadcasted variables, because we can assume that the
clusters are limited in number.
+ *
+ * The main strengths of this algorithm are the low computational
complexity and the intrinsic
+ * parallelism. The precomputed information for each point and for each
cluster can be computed
+ * with a computational complexity which is `O(N/W)`, where `N` is the
number of points in the
+ * dataset and `W` is the number of worker nodes. After that, every point
can be analyzed
+ * independently from the others.
+ *
+ * For every point we need to compute the average distance to all the
clusters. Since the formula
+ * above requires `O(D)` operations, this phase has a computational
complexity which is
+ * `O(C*D*N/W)` where `C` is the number of clusters (which we assume quite
low), `D` is the number
+ * of dimensions, `N` is the number of points in the dataset and `W` is
the number of worker
+ * nodes.
+ */
+private[evaluation] object CosineSilhouette extends Silhouette {
+
+ private[this] var kryoRegistrationPerformed: Boolean = false
+
+ private[this] val normalizedFeaturesColName = "normalizedFeatures"
+
+ /**
+ * This method registers the class
+ * [[org.apache.spark.ml.evaluation.CosineSilhouette.ClusterStats]]
+ * for kryo serialization.
+ *
+ * @param sc `SparkContext` to be used
+ */
+ def registerKryoClasses(sc: SparkContext): Unit = {
+ if (!kryoRegistrationPerformed) {
+ sc.getConf.registerKryoClasses(
+ Array(
+ classOf[CosineSilhouette.ClusterStats]
+ )
+ )
+ kryoRegistrationPerformed = true
+ }
+ }
+
+ case class ClusterStats(normalizedFeatureSum: Vector, numOfPoints: Long)
+
+ /**
+ * The method takes the input dataset and computes the aggregated values
+ * about a cluster which are needed by the algorithm.
+ *
+ * @param df The DataFrame which contains the input data
+ * @param predictionCol The name of the column which contains the
predicted cluster id
+ * for the point.
+ * @return A [[scala.collection.immutable.Map]] which associates each
cluster id to a
+ * [[ClusterStats]] object (which contains the precomputed
values `N` and
+ * `$\Omega_{\Gamma}$`).
+ */
+ def computeClusterStats(df: DataFrame, predictionCol: String):
Map[Double, ClusterStats] = {
+ val numFeatures =
df.select(col(normalizedFeaturesColName)).first().getAs[Vector](0).size
+ val clustersStatsRDD = df.select(
+ col(predictionCol).cast(DoubleType), col(normalizedFeaturesColName))
+ .rdd
+ .map { row => (row.getDouble(0), row.getAs[Vector](1)) }
+ .aggregateByKey[(DenseVector,
Long)]((Vectors.zeros(numFeatures).toDense, 0L))(
+ seqOp = {
+ case ((normalizedFeaturesSum: DenseVector, numOfPoints: Long),
(normalizedFeatures)) =>
+ BLAS.axpy(1.0, normalizedFeatures, normalizedFeaturesSum)
+ (normalizedFeaturesSum, numOfPoints + 1)
+ },
+ combOp = {
+ case ((normalizedFeaturesSum1, numOfPoints1),
(normalizedFeaturesSum2, numOfPoints2)) =>
+ BLAS.axpy(1.0, normalizedFeaturesSum2, normalizedFeaturesSum1)
+ (normalizedFeaturesSum1, numOfPoints1 + numOfPoints2)
+ }
+ )
+
+ clustersStatsRDD
+ .collectAsMap()
+ .mapValues {
--- End diff --
that would work too, but it is quite pointless up to me, since here we are
dealing with the clusters and they are assumed not to be many... usually they
are << 100....
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]