Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20396#discussion_r163941171
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala 
---
    @@ -421,13 +460,220 @@ private[evaluation] object 
SquaredEuclideanSilhouette {
           computeSilhouetteCoefficient(bClustersStatsMap, _: Vector, _: 
Double, _: Double)
         }
     
    -    val silhouetteScore = dfWithSquaredNorm
    -      .select(avg(
    -        computeSilhouetteCoefficientUDF(
    -          col(featuresCol), col(predictionCol).cast(DoubleType), 
col("squaredNorm"))
    -      ))
    -      .collect()(0)
    -      .getDouble(0)
    +    val silhouetteScore = overallScore(dfWithSquaredNorm,
    +      computeSilhouetteCoefficientUDF(col(featuresCol), 
col(predictionCol).cast(DoubleType),
    +        col("squaredNorm")))
    +
    +    bClustersStatsMap.destroy()
    +
    +    silhouetteScore
    +  }
    +}
    +
    +
    +/**
    + * The algorithm which is implemented in this object, instead, is an 
efficient and parallel
    + * implementation of the Silhouette using the cosine distance measure. The 
cosine distance
    + * measure is defined as `1 - s` where `s` is the cosine similarity 
between two points.
    + *
    + * The total distance of the point `X` to the points `$C_{i}$` belonging 
to the cluster `$\Gamma$`
    + * is:
    + *
    + * <blockquote>
    + *   $$
    + *   \sum\limits_{i=1}^N d(X, C_{i} ) =
    + *   \sum\limits_{i=1}^N \Big( 1 - \frac{\sum\limits_{j=1}^D x_{j}c_{ij} 
}{ \|X\|\|C_{i}\|} \Big)
    + *   = \sum\limits_{i=1}^N 1 - \sum\limits_{i=1}^N \sum\limits_{j=1}^D 
\frac{x_{j}}{\|X\|}
    + *   \frac{c_{ij}}{\|C_{i}\|}
    + *   = N - \sum\limits_{j=1}^D \frac{x_{j}}{\|X\|} \Big( 
\sum\limits_{i=1}^N
    + *   \frac{c_{ij}}{\|C_{i}\|} \Big)
    + *   $$
    + * </blockquote>
    + *
    + * where `$x_{j}$` is the `j`-th dimension of the point `X` and `$c_{ij}$` 
is the `j`-th dimension
    + * of the `i`-th point in cluster `$\Gamma$`.
    + *
    + * Then, we can define the vector:
    + *
    + * <blockquote>
    + *   $$
    + *   \xi_{X} : \xi_{X i} = \frac{x_{i}}{\|X\|}, i = 1, ..., D
    + *   $$
    + * </blockquote>
    + *
    + * which can be precomputed for each point and the vector
    + *
    + * <blockquote>
    + *   $$
    + *   \Omega_{\Gamma} : \Omega_{\Gamma i} = \sum\limits_{j=1}^N 
\xi_{C_{j}i}, i = 1, ..., D
    + *   $$
    + * </blockquote>
    + *
    + * which can be precomputed too for each cluster `$\Gamma$` by its points 
`$C_{i}$`.
    + *
    + * With these definitions, the numerator becomes:
    + *
    + * <blockquote>
    + *   $$
    + *   N - \sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}
    + *   $$
    + * </blockquote>
    + *
    + * Thus the average distance of a point `X` to the points of the cluster 
`$\Gamma$` is:
    + *
    + * <blockquote>
    + *   $$
    + *   1 - \frac{\sum\limits_{j=1}^D \xi_{X j} \Omega_{\Gamma j}}{N}
    + *   $$
    + * </blockquote>
    + *
    + * In the implementation, the precomputed values for the clusters are 
distributed among the worker
    + * nodes via broadcasted variables, because we can assume that the 
clusters are limited in number.
    + *
    + * The main strengths of this algorithm are the low computational 
complexity and the intrinsic
    + * parallelism. The precomputed information for each point and for each 
cluster can be computed
    + * with a computational complexity which is `O(N/W)`, where `N` is the 
number of points in the
    + * dataset and `W` is the number of worker nodes. After that, every point 
can be analyzed
    + * independently from the others.
    + *
    + * For every point we need to compute the average distance to all the 
clusters. Since the formula
    + * above requires `O(D)` operations, this phase has a computational 
complexity which is
    + * `O(C*D*N/W)` where `C` is the number of clusters (which we assume quite 
low), `D` is the number
    + * of dimensions, `N` is the number of points in the dataset and `W` is 
the number of worker
    + * nodes.
    + */
    +private[evaluation] object CosineSilhouette extends Silhouette {
    +
    +  private[this] var kryoRegistrationPerformed: Boolean = false
    +
    +  private[this] val normalizedFeaturesColName = "normalizedFeatures"
    +
    +  /**
    +   * This method registers the class
    +   * [[org.apache.spark.ml.evaluation.CosineSilhouette.ClusterStats]]
    +   * for kryo serialization.
    +   *
    +   * @param sc `SparkContext` to be used
    +   */
    +  def registerKryoClasses(sc: SparkContext): Unit = {
    +    if (!kryoRegistrationPerformed) {
    +      sc.getConf.registerKryoClasses(
    +        Array(
    +          classOf[CosineSilhouette.ClusterStats]
    +        )
    +      )
    +      kryoRegistrationPerformed = true
    +    }
    +  }
    +
    +  case class ClusterStats(normalizedFeatureSum: Vector, numOfPoints: Long)
    +
    +  /**
    +   * The method takes the input dataset and computes the aggregated values
    +   * about a cluster which are needed by the algorithm.
    +   *
    +   * @param df The DataFrame which contains the input data
    +   * @param predictionCol The name of the column which contains the 
predicted cluster id
    +   *                      for the point.
    +   * @return A [[scala.collection.immutable.Map]] which associates each 
cluster id to a
    +   *         [[ClusterStats]] object (which contains the precomputed 
values `N` and
    +   *         `$\Omega_{\Gamma}$`).
    +   */
    +  def computeClusterStats(df: DataFrame, predictionCol: String): 
Map[Double, ClusterStats] = {
    +    val numFeatures = 
df.select(col(normalizedFeaturesColName)).first().getAs[Vector](0).size
    +    val clustersStatsRDD = df.select(
    +      col(predictionCol).cast(DoubleType), col(normalizedFeaturesColName))
    +      .rdd
    +      .map { row => (row.getDouble(0), row.getAs[Vector](1)) }
    +      .aggregateByKey[(DenseVector, 
Long)]((Vectors.zeros(numFeatures).toDense, 0L))(
    +      seqOp = {
    +        case ((normalizedFeaturesSum: DenseVector, numOfPoints: Long), 
(normalizedFeatures)) =>
    +          BLAS.axpy(1.0, normalizedFeatures, normalizedFeaturesSum)
    +          (normalizedFeaturesSum, numOfPoints + 1)
    +      },
    +      combOp = {
    +        case ((normalizedFeaturesSum1, numOfPoints1), 
(normalizedFeaturesSum2, numOfPoints2)) =>
    +          BLAS.axpy(1.0, normalizedFeaturesSum2, normalizedFeaturesSum1)
    +          (normalizedFeaturesSum1, numOfPoints1 + numOfPoints2)
    +      }
    +    )
    +
    +    clustersStatsRDD
    +      .collectAsMap()
    +      .mapValues {
    --- End diff --
    
    What about `mapValues` on the RDD before collecting or does that not work?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to