Github user yanboliang commented on a diff in the pull request:
https://github.com/apache/spark/pull/18538#discussion_r137226104
--- Diff:
mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala
---
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.evaluation
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.{Experimental, Since}
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors,
VectorUDT}
+import org.apache.spark.ml.param.ParamMap
+import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol}
+import org.apache.spark.ml.util.{DefaultParamsReadable,
DefaultParamsWritable, Identifiable, SchemaUtils}
+import org.apache.spark.sql.{DataFrame, Dataset}
+import org.apache.spark.sql.functions.{avg, col, udf}
+import org.apache.spark.sql.types.IntegerType
+
+/**
+ * :: Experimental ::
+ * Evaluator for clustering results.
+ * The metric computes the Silhouette measure
+ * using the squared Euclidean distance.
+ *
+ * The Silhouette is a measure for the validation
+ * of the consistency within clusters. It ranges
+ * between 1 and -1, where a value close to 1
+ * means that the points in a cluster are close
+ * to the other points in the same cluster and
+ * far from the points of the other clusters.
+ */
+@Experimental
+@Since("2.3.0")
+class ClusteringEvaluator (val uid: String)
+ extends Evaluator with HasPredictionCol with HasFeaturesCol with
DefaultParamsWritable {
+
+ def this() = this(Identifiable.randomUID("cluEval"))
+
+ override def copy(pMap: ParamMap): ClusteringEvaluator =
this.defaultCopy(pMap)
+
+ override def isLargerBetter: Boolean = true
+
+ /** @group setParam */
+ @Since("2.3.0")
+ def setPredictionCol(value: String): this.type = set(predictionCol,
value)
+
+ /** @group setParam */
+ @Since("2.3.0")
+ def setFeaturesCol(value: String): this.type = set(featuresCol, value)
+
+ @Since("2.3.0")
+ override def evaluate(dataset: Dataset[_]): Double = {
+ SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new
VectorUDT)
+ SchemaUtils.checkColumnType(dataset.schema, $(predictionCol),
IntegerType)
+
+ SquaredEuclideanSilhouette.computeSilhouetteScore(
+ dataset,
+ $(predictionCol),
+ $(featuresCol)
+ )
+ }
+}
+
+
+object ClusteringEvaluator
+ extends DefaultParamsReadable[ClusteringEvaluator] {
+
+ override def load(path: String): ClusteringEvaluator = super.load(path)
+
+}
+
+
+/**
+ * SquaredEuclideanSilhouette computes the average of the
+ * Silhouette over all the data of the dataset, which is
+ * a measure of how appropriately the data have been clustered.
+ *
+ * The Silhouette for each point `i` is defined as:
+ *
+ * <blockquote>
+ * $$
+ * s_{i} = \frac{b_{i}-a_{i}}{max\{a_{i},b_{i}\}}
+ * $$
+ * </blockquote>
+ *
+ * which can be rewritten as
+ *
+ * <blockquote>
+ * $$
+ * s_{i}= \begin{cases}
+ * 1-\frac{a_{i}}{b_{i}} & \text{if } a_{i} \leq b_{i} \\
+ * \frac{b_{i}}{a_{i}}-1 & \text{if } a_{i} \gt b_{i} \end{cases}
+ * $$
+ * </blockquote>
+ *
+ * where `$a_{i}$` is the average dissimilarity of `i` with all other data
+ * within the same cluster, `$b_{i}$` is the lowest average dissimilarity
+ * of to any other cluster, of which `i` is not a member.
+ * `$a_{i}$` can be interpreted as as how well `i` is assigned to its
cluster
+ * (the smaller the value, the better the assignment), while `$b_{i}$` is
+ * a measure of how well `i` has not been assigned to its "neighboring
cluster",
+ * ie. the nearest cluster to `i`.
+ *
+ * Unfortunately, the naive implementation of the algorithm requires to
compute
+ * the distance of each couple of points in the dataset. Since the
computation of
+ * the distance measure takes `D` operations - if `D` is the number of
dimensions
+ * of each point, the computational complexity of the algorithm is
`O(N^2^*D)`, where
+ * `N` is the cardinality of the dataset. Of course this is not scalable
in `N`,
+ * which is the critical number in a Big Data context.
+ *
+ * The algorithm which is implemented in this object, instead, is an
efficient
+ * and parallel implementation of the Silhouette using the squared
Euclidean
+ * distance measure.
+ *
+ * With this assumption, the average of the distance of the point `X`
+ * to the points `$C_{i}$` belonging to the cluster `$\Gamma$` is:
+ *
+ * <blockquote>
+ * $$
+ * \sum\limits_{i=1}^N d(X, C_{i} )^2 =
+ * \sum\limits_{i=1}^N \Big( \sum\limits_{j=1}^D (x_{j}-c_{ij})^2 \Big)
+ * = \sum\limits_{i=1}^N \Big( \sum\limits_{j=1}^D x_{j}^2 +
+ * \sum\limits_{j=1}^D c_{ij}^2 -2\sum\limits_{j=1}^D x_{i}c_{ij} \Big)
+ * = \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}^2 +
+ * \sum\limits_{i=1}^N \sum\limits_{j=1}^D c_{ij}^2
+ * -2 \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{i}c_{ij}
+ * $$
+ * </blockquote>
+ *
+ * where `$x_{j}$` is the `j`-th dimension of the point `X` and
+ * `$c_{ij}$` is the `j`-th dimension of the `i`-th point in cluster
`$\Gamma$`.
+ *
+ * Then, the first term of the equation can be rewritten as:
+ *
+ * <blockquote>
+ * $$
+ * \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}^2 = N \xi_{X} \text{ ,
+ * with } \xi_{X} = \sum\limits_{j=1}^D x_{j}^2
+ * $$
+ * </blockquote>
+ *
+ * where `$\xi_{X}$` is fixed for each point and it can be precomputed.
+ *
+ * Moreover, the second term is fixed for each cluster too,
+ * thus we can name it `$\Psi_{\Gamma}$`
+ *
+ * <blockquote>
+ * $$
+ * \sum\limits_{i=1}^N \sum\limits_{j=1}^D c_{ij}^2 =
+ * \sum\limits_{i=1}^N \xi_{C_{i}} = \Psi_{\Gamma}
+ * $$
+ * </blockquote>
+ *
+ * Last, the third element becomes
+ *
+ * <blockquote>
+ * $$
+ * \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{i}c_{ij} =
+ * \sum\limits_{j=1}^D \Big(\sum\limits_{i=1}^N c_{ij} \Big) x_{i}
+ * $$
+ * </blockquote>
+ *
+ * thus defining the vector
+ *
+ * <blockquote>
+ * $$
+ * Y_{\Gamma}:Y_{\Gamma j} = \sum\limits_{i=1}^N c_{ij} , j=0, ..., D
+ * $$
+ * </blockquote>
+ *
+ * which is fixed for each cluster `$\Gamma$`, we have
+ *
+ * <blockquote>
+ * $$
+ * \sum\limits_{j=1}^D \Big(\sum\limits_{i=1}^N c_{ij} \Big) x_{i} =
+ * \sum\limits_{j=1}^D Y_{\Gamma j} x_{i}
+ * $$
+ * </blockquote>
+ *
+ * In this way, the previous equation becomes
+ *
+ * <blockquote>
+ * $$
+ * N\xi_{X} + \Psi_{\Gamma} - 2 \sum\limits_{j=1}^D Y_{\Gamma j} x_{i}
+ * $$
+ * </blockquote>
+ *
+ * and the distance of a point to a cluster can be computed as
+ *
+ * <blockquote>
+ * $$
+ * \frac{\sum\limits_{i=1}^N d(X, C_{i} )^2}{N} =
+ * \frac{N\xi_{X} + \Psi_{\Gamma} - 2 \sum\limits_{j=1}^D Y_{\Gamma j}
x_{i}}{N} =
+ * \xi_{X} + \frac{\Psi_{\Gamma} }{N} - 2 \frac{\sum\limits_{j=1}^D
Y_{\Gamma j} x_{i}}{N}
+ * $$
+ * </blockquote>
+ *
+ * Thus, it is enough to precompute the constant `$\xi_{X}$` for each
point `X`
+ * and the constants `$\Psi_{\Gamma}$` and `N` and the vector
`$Y_{\Gamma}$` for
+ * each cluster `$\Gamma$`.
+ *
+ * In the implementation, the precomputed values for the clusters
+ * are distributed among the worker nodes via broadcasted variables,
+ * because we can assume that the clusters are limited in number and
+ * anyway they are much fewer than the points.
+ *
+ * The main strengths of this algorithm are the low computational
complexity
+ * and the intrinsic parallelism. The precomputed information for each
point
+ * and for each cluster can be computed with a computational complexity
+ * which is `O(N/W)`, where `N` is the number of points in the dataset and
+ * `W` is the number of worker nodes. After that, every point can be
+ * analyzed independently of the others.
+ *
+ * For every point we need to compute the average distance to all the
clusters.
+ * Since the formula above requires `O(D)` operations, this phase has a
+ * computational complexity which is `O(C*D*N/W)` where `C` is the number
of
+ * clusters (which we assume quite low), `D` is the number of dimensions,
+ * `N` is the number of points in the dataset and `W` is the number
+ * of worker nodes.
+ */
+private[evaluation] object SquaredEuclideanSilhouette {
+
+ private[this] var kryoRegistrationPerformed: Boolean = false
+
+ /**
+ * This method registers the class
+ *
[[org.apache.spark.ml.evaluation.SquaredEuclideanSilhouette.ClusterStats]]
+ * for kryo serialization.
+ *
+ * @param sc `SparkContext` to be used
+ */
+ def registerKryoClasses(sc: SparkContext): Unit = {
+ if (! kryoRegistrationPerformed) {
+ sc.getConf.registerKryoClasses(
+ Array(
+ classOf[SquaredEuclideanSilhouette.ClusterStats]
+ )
+ )
+ kryoRegistrationPerformed = true
+ }
+ }
+
+ case class ClusterStats(featureSum: Vector, squaredNormSum: Double,
numOfPoints: Long)
+
+ /**
+ * The method takes the input dataset and computes the aggregated values
+ * about a cluster which are needed by the algorithm.
+ *
+ * @param df The DataFrame which contains the input data
+ * @param predictionCol The name of the column which contains the
cluster id for the point.
+ * @param featuresCol The name of the column which contains the feature
vector of the point.
+ * @return A [[scala.collection.immutable.Map]] which associates each
cluster id
+ * to a [[ClusterStats]] object (which contains the precomputed
values `N`,
+ * `\Psi_{\Gamma}` and `Y_{\Gamma}` for a cluster).
--- End diff --
```\Psi_{\Gamma}``` and ```Y_{\Gamma}``` should be surrounded with ```$```
to get correct mathematical symbol in generated doc.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]