Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/4254#discussion_r23727672
--- Diff:
mllib/src/main/scala/org/apache/spark/mllib/clustering/PIClustering.scala ---
@@ -0,0 +1,433 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.clustering
+
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV}
+import org.apache.log4j.Logger
+import org.apache.spark.SparkContext
+import org.apache.spark.graphx._
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
+import org.apache.spark.rdd.RDD
+
+import scala.language.existentials
+
+/**
+ * Implements the scalable graph clustering algorithm Power Iteration
Clustering (see
+ * www.icml2010.org/papers/387.pdf). From the abstract:
+ *
+ * The input data is first transformed to a normalized Affinity Matrix via
Gaussian pairwise
+ * distance calculations. Power iteration is then used to find a
dimensionality-reduced
+ * representation. The resulting pseudo-eigenvector provides effective
clustering - as
+ * performed by Parallel KMeans.
+ */
+object PIClustering {
+
+ private val logger = Logger.getLogger(getClass.getName())
+
+ type LabeledPoint = (VertexId, BDV[Double])
+ type Points = Seq[LabeledPoint]
+ type DGraph = Graph[Double, Double]
+ type IndexedVector[Double] = (Long, BDV[Double])
+
+ // Terminate iteration when norm changes by less than this value
+ private[mllib] val DefaultMinNormChange: Double = 1e-11
+
+ // Default Ï for Gaussian Distance calculations
+ private[mllib] val DefaultSigma = 1.0
+
+ // Default number of iterations for PIC loop
+ private[mllib] val DefaultIterations: Int = 20
+
+ // Default minimum affinity between points - lower than this it is
considered
+ // zero and no edge will be created
+ private[mllib] val DefaultMinAffinity = 1e-11
+
+ // Do not allow divide by zero: change to this value instead
+ val DefaultDivideByZeroVal: Double = 1e-15
+
+ // Default number of runs by the KMeans.run() method
+ val DefaultKMeansRuns = 10
+
+ /**
+ *
+ * Run a Power Iteration Clustering
+ *
+ * @param sc Spark Context
+ * @param points Input Points in format of [(VertexId,(x,y)]
+ * where VertexId is a Long
+ * @param nClusters Number of clusters to create
+ * @param nIterations Number of iterations of the PIC algorithm
+ * that calculates primary PseudoEigenvector and
Eigenvalue
+ * @param sigma Sigma for Gaussian distribution calculation according
to
+ * [1/2 *sqrt(pi*sigma)] exp (- (x-y)**2 / 2sigma**2
+ * @param minAffinity Minimum Affinity between two Points in the input
dataset: below
+ * this threshold the affinity will be considered
"close to" zero and
+ * no Edge will be created between those Points in
the sparse matrix
+ * @param nRuns Number of runs for the KMeans clustering
+ * @return Tuple of (Seq[(Cluster Id,Cluster Center)],
+ * Seq[(VertexId, ClusterID Membership)]
+ */
+ def run(sc: SparkContext,
+ points: Points,
+ nClusters: Int,
+ nIterations: Int = DefaultIterations,
+ sigma: Double = DefaultSigma,
+ minAffinity: Double = DefaultMinAffinity,
+ nRuns: Int = DefaultKMeansRuns)
+ : (Seq[(Int, Vector)], Seq[((VertexId, Vector), Int)]) = {
+ val vidsRdd = sc.parallelize(points.map(_._1).sorted)
+ val nVertices = points.length
+
+ val (wRdd, rowSums) = createNormalizedAffinityMatrix(sc, points, sigma)
+ val initialVt = createInitialVector(sc, points.map(_._1), rowSums)
+ if (logger.isDebugEnabled) {
+ logger.debug(s"Vt(0)=${
+ printVector(new BDV(initialVt.map {
+ _._2
+ }.toArray))
+ }")
+ }
+ val edgesRdd = createSparseEdgesRdd(sc, wRdd, minAffinity)
+ val G = createGraphFromEdges(sc, edgesRdd, points.size,
Some(initialVt))
+ if (logger.isDebugEnabled) {
+ logger.debug(printMatrixFromEdges(G.edges))
+ }
+ val (gUpdated, lambda, vt) = getPrincipalEigen(sc, G, nIterations)
+ // TODO: avoid local collect and then sc.parallelize.
+ val localVt = vt.collect.sortBy(_._1)
+ val vectRdd = sc.parallelize(localVt.map(v => (v._1,
Vectors.dense(v._2))))
+ vectRdd.cache()
+ val model = KMeans.train(vectRdd.map {
+ _._2
+ }, nClusters, nRuns)
+ vectRdd.unpersist()
+ if (logger.isDebugEnabled) {
+ logger.debug(s"Eigenvalue = $lambda EigenVector:
${localVt.mkString(",")}")
+ }
+ val estimates = vectRdd.zip(model.predict(vectRdd.map(_._2)))
+ if (logger.isDebugEnabled) {
+ logger.debug(s"lambda=$lambda eigen=${localVt.mkString(",")}")
+ }
+ val ccs = (0 until
model.clusterCenters.length).zip(model.clusterCenters)
+ if (logger.isDebugEnabled) {
+ logger.debug(s"Kmeans model cluster centers: ${ccs.mkString(",")}")
+ }
+ val estCollected = estimates.collect.sortBy(_._1._1)
+ if (logger.isDebugEnabled) {
+ val clusters = estCollected.map(_._2)
+ val counts = estCollected.groupBy(_._2).mapValues {
+ _.length
+ }
+ logger.debug(s"Cluster counts: Counts: ${counts.mkString(",")}"
+ + s"\nCluster Estimates: ${estCollected.mkString(",")}")
+ }
+ (ccs, estCollected)
+ }
+
+ /**
+ * Read Points from an input file in the following format:
+ * Vertex1Id Coord11 Coord12 CoordX13 .. Coord1D
+ * Vertex2Id Coord21 Coord22 CoordX23 .. Coord2D
+ * ..
+ * VertexNId CoordN1 CoordN2 CoordN23 .. CoordND
+ *
+ * Where N is the number of observations, each a D-dimension point
+ *
+ * E.g.
+ *
+ * 19 1.8035177495 0.7460582552 0.2361611395 -0.8645567427
-0.8613062
+ * 10 0.5534111111 1.0456386879 1.7045663273 0.7281759816
1.0807487792
+ * 911 1.200749626 1.8962364439 2.5117192131 -0.4034737281
-0.9069696484
+ *
+ * Which represents three 5-dimensional input Points with VertexIds
19,10, and 911
+ * @param verticesFile Local filesystem path to the Points input file
+ * @return Set of Vertices in format appropriate for consumption by the
PIC algorithm
+ */
+ def readVerticesfromFile(verticesFile: String): Points = {
--- End diff --
Let's not handle I/O here. We can have an example code under `examples/`
and load files there.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]