# [GitHub] spark pull request #20396: [SPARK-23217][ML] Add cosine distance measure to ...

Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20396#discussion_r167581903

--- Diff:
mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala
---
@@ -421,13 +456,220 @@ private[evaluation] object
SquaredEuclideanSilhouette {
Double, _: Double)
}

-    val silhouetteScore = dfWithSquaredNorm
-      .select(avg(
-        computeSilhouetteCoefficientUDF(
-          col(featuresCol), col(predictionCol).cast(DoubleType),
col("squaredNorm"))
-      ))
-      .collect()(0)
-      .getDouble(0)
+    val silhouetteScore = overallScore(dfWithSquaredNorm,
+      computeSilhouetteCoefficientUDF(col(featuresCol),
col(predictionCol).cast(DoubleType),
+        col("squaredNorm")))
+
+
+    silhouetteScore
+  }
+}
+
+
+/**
+ * The algorithm which is implemented in this object, instead, is an
efficient and parallel
+ * implementation of the Silhouette using the cosine distance measure. The
cosine distance
+ * measure is defined as 1 - s where s is the cosine similarity
between two points.
+ *
+ * The total distance of the point X to the points $C_{i}$ belonging
to the cluster $\Gamma$
+ * is:
+ *
+ * <blockquote>
+ *   $$+ * \sum\limits_{i=1}^N d(X, C_{i} ) = + * \sum\limits_{i=1}^N \Big( 1 - \frac{\sum\limits_{j=1}^D x_{j}c_{ij} }{ \|X\|\|C_{i}\|} \Big) + * = \sum\limits_{i=1}^N 1 - \sum\limits_{i=1}^N \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} + * \frac{c_{ij}}{\|C_{i}\|} + * = N - \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} \Big( \sum\limits_{i=1}^N + * \frac{c_{ij}}{\|C_{i}\|} \Big) + *$$
+ * </blockquote>
+ *
+ * where $x_{j}$ is the j-th dimension of the point X and $c_{ij}$
is the j-th dimension
+ * of the i-th point in cluster $\Gamma$.
+ *
+ * Then, we can define the vector:
+ *
+ * <blockquote>
+ *   $$+ * \xi_{X} : \xi_{X i} = \frac{x_{i}}{\|X\|}, i = 1, ..., D + *$$
+ * </blockquote>
+ *
+ * which can be precomputed for each point and the vector
+ *
+ * <blockquote>
+ *   $$+ * \Omega_{\Gamma} : \Omega_{\Gamma i} = \sum\limits_{j=1}^N \xi_{C_{j}i}, i = 1, ..., D + *$$
+ * </blockquote>
+ *
+ * which can be precomputed too for each cluster $\Gamma$ by its points
$C_{i}$.
+ *
+ * With these definitions, the numerator becomes:
+ *
+ * <blockquote>
+ *   $$+ * N - \sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j} + *$$
+ * </blockquote>
+ *
+ * Thus the average distance of a point X to the points of the cluster
$\Gamma$ is:
+ *
+ * <blockquote>
+ *   $$+ * 1 - \frac{\sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}}{N} + *$$
+ * </blockquote>
+ *
+ * In the implementation, the precomputed values for the clusters are
distributed among the worker
+ * nodes via broadcasted variables, because we can assume that the
clusters are limited in number.
+ *
+ * The main strengths of this algorithm are the low computational
complexity and the intrinsic
+ * parallelism. The precomputed information for each point and for each
cluster can be computed
+ * with a computational complexity which is O(N/W), where N is the
number of points in the
+ * dataset and W is the number of worker nodes. After that, every point
can be analyzed
+ * independently from the others.
+ *
+ * For every point we need to compute the average distance to all the
clusters. Since the formula
+ * above requires O(D) operations, this phase has a computational
complexity which is
+ * O(C*D*N/W) where C is the number of clusters (which we assume quite
low), D is the number
+ * of dimensions, N is the number of points in the dataset and W is
the number of worker
+ * nodes.
+ */
+private[evaluation] object CosineSilhouette extends Silhouette {
+
+  private[this] var kryoRegistrationPerformed: Boolean = false
+
+  private[this] val normalizedFeaturesColName = "normalizedFeatures"
+
+  /**
+   * This method registers the class
+   * [[org.apache.spark.ml.evaluation.CosineSilhouette.ClusterStats]]
+   * for kryo serialization.
+   *
+   * @param sc SparkContext to be used
+   */
+  def registerKryoClasses(sc: SparkContext): Unit = {
+    if (!kryoRegistrationPerformed) {
+      sc.getConf.registerKryoClasses(
+        Array(
+          classOf[CosineSilhouette.ClusterStats]
+        )
+      )
+      kryoRegistrationPerformed = true
+    }
+  }
+
+  case class ClusterStats(normalizedFeatureSum: Vector, numOfPoints: Long)
--- End diff --

Back on this -- how about just using a Tuple2 of Vector, Long? no new class
to register. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org