Github user mgaido91 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20396#discussion_r164059420
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala 
---
    @@ -421,13 +460,220 @@ private[evaluation] object 
SquaredEuclideanSilhouette {
           computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: 
Double, _: Double)
         }
     
    -    val silhouetteScore = dfWithSquaredNorm
    -      .select(avg(
    -        computeSilhouetteCoefficientUDF(
    -          col(featuresCol), col(predictionCol).cast(DoubleType), 
col("squaredNorm"))
    -      ))
    -      .collect()(0)
    -      .getDouble(0)
    +    val silhouetteScore = overallScore(dfWithSquaredNorm,
    +      computeSilhouetteCoefficientUDF(col(featuresCol), 
col(predictionCol).cast(DoubleType),
    +        col("squaredNorm")))
    +
    +    bClustersStatsMap.destroy()
    +
    +    silhouetteScore
    +  }
    +}
    +
    +
    +/**
    + * The algorithm which is implemented in this object, instead, is an 
efficient and parallel
    + * implementation of the Silhouette using the cosine distance measure. The 
cosine distance
    + * measure is defined as `1 - s` where `s` is the cosine similarity 
between two points.
    + *
    + * The total distance of the point `X` to the points `$C_{i}$` belonging 
to the cluster `$\Gamma$`
    + * is:
    + *
    + * <blockquote>
    + *   $$
    + *   \sum\limits_{i=1}^N d(X, C_{i} ) =
    + *   \sum\limits_{i=1}^N \Big( 1 - \frac{\sum\limits_{j=1}^D x_{j}c_{ij} 
}{ \|X\|\|C_{i}\|} \Big)
    + *   = \sum\limits_{i=1}^N 1 - \sum\limits_{i=1}^N \sum\limits_{j=1}^D 
\frac{x_{j}}{\|X\|}
    + *   \frac{c_{ij}}{\|C_{i}\|}
    + *   = N - \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} \Big( 
\sum\limits_{i=1}^N
    + *   \frac{c_{ij}}{\|C_{i}\|} \Big)
    + *   $$
    + * </blockquote>
    + *
    + * where `$x_{j}$` is the `j`-th dimension of the point `X` and `$c_{ij}$` 
is the `j`-th dimension
    + * of the `i`-th point in cluster `$\Gamma$`.
    + *
    + * Then, we can define the vector:
    + *
    + * <blockquote>
    + *   $$
    + *   \xi_{X} : \xi_{X i} = \frac{x_{i}}{\|X\|}, i = 1, ..., D
    + *   $$
    + * </blockquote>
    + *
    + * which can be precomputed for each point and the vector
    + *
    + * <blockquote>
    + *   $$
    + *   \Omega_{\Gamma} : \Omega_{\Gamma i} = \sum\limits_{j=1}^N 
\xi_{C_{j}i}, i = 1, ..., D
    + *   $$
    + * </blockquote>
    + *
    + * which can be precomputed too for each cluster `$\Gamma$` by its points 
`$C_{i}$`.
    + *
    + * With these definitions, the numerator becomes:
    + *
    + * <blockquote>
    + *   $$
    + *   N - \sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}
    + *   $$
    + * </blockquote>
    + *
    + * Thus the average distance of a point `X` to the points of the cluster 
`$\Gamma$` is:
    + *
    + * <blockquote>
    + *   $$
    + *   1 - \frac{\sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}}{N}
    + *   $$
    + * </blockquote>
    + *
    + * In the implementation, the precomputed values for the clusters are 
distributed among the worker
    + * nodes via broadcasted variables, because we can assume that the 
clusters are limited in number.
    + *
    + * The main strengths of this algorithm are the low computational 
complexity and the intrinsic
    + * parallelism. The precomputed information for each point and for each 
cluster can be computed
    + * with a computational complexity which is `O(N/W)`, where `N` is the 
number of points in the
    + * dataset and `W` is the number of worker nodes. After that, every point 
can be analyzed
    + * independently from the others.
    + *
    + * For every point we need to compute the average distance to all the 
clusters. Since the formula
    + * above requires `O(D)` operations, this phase has a computational 
complexity which is
    + * `O(C*D*N/W)` where `C` is the number of clusters (which we assume quite 
low), `D` is the number
    + * of dimensions, `N` is the number of points in the dataset and `W` is 
the number of worker
    + * nodes.
    + */
    +private[evaluation] object CosineSilhouette extends Silhouette {
    +
    +  private[this] var kryoRegistrationPerformed: Boolean = false
    +
    +  private[this] val normalizedFeaturesColName = "normalizedFeatures"
    +
    +  /**
    +   * This method registers the class
    +   * [[org.apache.spark.ml.evaluation.CosineSilhouette.ClusterStats]]
    +   * for kryo serialization.
    +   *
    +   * @param sc `SparkContext` to be used
    +   */
    +  def registerKryoClasses(sc: SparkContext): Unit = {
    +    if (!kryoRegistrationPerformed) {
    +      sc.getConf.registerKryoClasses(
    +        Array(
    +          classOf[CosineSilhouette.ClusterStats]
    +        )
    +      )
    +      kryoRegistrationPerformed = true
    +    }
    +  }
    +
    +  case class ClusterStats(normalizedFeatureSum: Vector, numOfPoints: Long)
    +
    +  /**
    +   * The method takes the input dataset and computes the aggregated values
    +   * about a cluster which are needed by the algorithm.
    +   *
    +   * @param df The DataFrame which contains the input data
    +   * @param predictionCol The name of the column which contains the 
predicted cluster id
    +   *                      for the point.
    +   * @return A [[scala.collection.immutable.Map]] which associates each 
cluster id to a
    +   *         [[ClusterStats]] object (which contains the precomputed 
values `N` and
    +   *         `$\Omega_{\Gamma}$`).
    +   */
    +  def computeClusterStats(df: DataFrame, predictionCol: String): 
Map[Double, ClusterStats] = {
    +    val numFeatures = 
df.select(col(normalizedFeaturesColName)).first().getAs[Vector](0).size
    +    val clustersStatsRDD = df.select(
    +      col(predictionCol).cast(DoubleType), col(normalizedFeaturesColName))
    +      .rdd
    +      .map { row => (row.getDouble(0), row.getAs[Vector](1)) }
    +      .aggregateByKey[(DenseVector, 
Long)]((Vectors.zeros(numFeatures).toDense, 0L))(
    +      seqOp = {
    +        case ((normalizedFeaturesSum: DenseVector, numOfPoints: Long), 
(normalizedFeatures)) =>
    +          BLAS.axpy(1.0, normalizedFeatures, normalizedFeaturesSum)
    +          (normalizedFeaturesSum, numOfPoints + 1)
    +      },
    +      combOp = {
    +        case ((normalizedFeaturesSum1, numOfPoints1), 
(normalizedFeaturesSum2, numOfPoints2)) =>
    +          BLAS.axpy(1.0, normalizedFeaturesSum2, normalizedFeaturesSum1)
    +          (normalizedFeaturesSum1, numOfPoints1 + numOfPoints2)
    +      }
    +    )
    +
    +    clustersStatsRDD
    +      .collectAsMap()
    +      .mapValues {
    --- End diff --
    
    that would work too, but it is quite pointless up to me, since here we are 
dealing with the clusters and they are assumed not to be many... usually they 
are << 100....


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to