Github user huaxingao commented on a diff in the pull request:
https://github.com/apache/spark/pull/21513#discussion_r194190113
--- Diff: python/pyspark/ml/clustering.py ---
@@ -1156,6 +1159,216 @@ def getKeepLastCheckpoint(self):
return self.getOrDefault(self.keepLastCheckpoint)
+@inherit_doc
+class PowerIterationClustering(HasMaxIter, HasWeightCol, JavaParams,
JavaMLReadable,
+ JavaMLWritable):
+ """
+ .. note:: Experimental
+
+ Power Iteration Clustering (PIC), a scalable graph clustering
algorithm developed by
+ <a href=http://www.icml2010.org/papers/387.pdf>Lin and Cohen</a>. From
the abstract:
+ PIC finds a very low-dimensional embedding of a dataset using
truncated power
+ iteration on a normalized pair-wise similarity matrix of the data.
+
+ This class is not yet an Estimator/Transformer, use `assignClusters`
method to run the
+ PowerIterationClustering algorithm.
+
+ .. seealso:: `Wikipedia on Spectral clustering \
+ <http://en.wikipedia.org/wiki/Spectral_clustering>`_
+
+ >>> from pyspark.sql.types import StructField, StructType
+ >>> import math
+ >>> def genCircle(r, n):
+ ... points = []
+ ... for i in range(0, n):
+ ... theta = 2.0 * math.pi * i / n
+ ... points.append((r * math.cos(theta), r * math.sin(theta)))
+ ... return points
+ >>> def sim(x, y):
+ ... dist = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] -
y[1])
+ ... return math.exp(-dist / 2.0)
+ >>> r1 = 1.0
+ >>> n1 = 10
+ >>> r2 = 4.0
+ >>> n2 = 40
+ >>> n = n1 + n2
+ >>> points = genCircle(r1, n1) + genCircle(r2, n2)
+ >>> data = [(i, j, sim(points[i], points[j])) for i in range(1, n) for
j in range(0, i)]
+ >>> rdd = sc.parallelize(data, 2)
+ >>> schema = StructType([StructField("src", LongType(), False), \
+ StructField("dst", LongType(), True), \
+ StructField("weight", DoubleType(), True)])
+ >>> df = spark.createDataFrame(rdd, schema)
+ >>> pic = PowerIterationClustering()
+ >>> assignments =
pic.setK(2).setMaxIter(40).setWeightCol("weight").assignClusters(df)
+ >>> result = sorted(assignments.collect(), key=lambda x: x.id)
+ >>> result[0].cluster == result[1].cluster == result[2].cluster ==
result[3].cluster
+ True
+ >>> result[4].cluster == result[5].cluster == result[6].cluster ==
result[7].cluster
+ True
+ >>> pic_path = temp_path + "/pic"
+ >>> pic.save(pic_path)
+ >>> pic2 = PowerIterationClustering.load(pic_path)
+ >>> pic2.getK()
+ 2
+ >>> pic2.getMaxIter()
+ 40
+ >>> assignments2 = pic2.assignClusters(df)
+ >>> result2 = sorted(assignments2.collect(), key=lambda x: x.id)
+ >>> result2[0].cluster == result2[1].cluster == result2[2].cluster ==
result2[3].cluster
+ True
+ >>> result2[4].cluster == result2[5].cluster == result2[6].cluster ==
result2[7].cluster
+ True
+ >>> pic3 = PowerIterationClustering(k=4, initMode="degree",
srcCol="source", dstCol="dest")
+ >>> pic3.getSrcCol()
+ 'source'
+ >>> pic3.getDstCol()
+ 'dest'
+ >>> pic3.getK()
+ 4
+ >>> pic3.getMaxIter()
+ 20
+ >>> pic3.getInitMode()
+ 'degree'
+
+ .. versionadded:: 2.4.0
+ """
+
+ k = Param(Params._dummy(), "k",
+ "The number of clusters to create. Must be > 1.",
+ typeConverter=TypeConverters.toInt)
+ initMode = Param(Params._dummy(), "initMode",
+ "The initialization algorithm. This can be either " +
+ "'random' to use a random vector as vertex
properties, or 'degree' to use " +
+ "a normalized sum of similarities with other
vertices. Supported options: " +
+ "'random' and 'degree'.",
+ typeConverter=TypeConverters.toString)
+ srcCol = Param(Params._dummy(), "srcCol",
+ "Name of the input column for source vertex IDs.",
+ typeConverter=TypeConverters.toString)
+ dstCol = Param(Params._dummy(), "dstCol",
+ "Name of the input column for destination vertex IDs.",
+ typeConverter=TypeConverters.toString)
+
+ @keyword_only
+ def __init__(self, k=2, maxIter=20, initMode="random", srcCol="src",
dstCol="dst",
+ weightCol=None):
+ """
+ __init__(self, k=2, maxIter=20, initMode="random", srcCol="src",
dstCol="dst",\
+ weightCol=None)
+ """
+ super(PowerIterationClustering, self).__init__()
+ self._java_obj = self._new_java_obj(
+ "org.apache.spark.ml.clustering.PowerIterationClustering",
self.uid)
+ self._setDefault(k=2, maxIter=20, initMode="random", srcCol="src",
dstCol="dst")
+ kwargs = self._input_kwargs
+ self.setParams(**kwargs)
+
+ @keyword_only
+ @since("2.4.0")
+ def setParams(self, k=2, maxIter=20, initMode="random", srcCol="src",
dstCol="dst",
+ weightCol=None):
+ """
+ setParams(self, k=2, maxIter=20, initMode="random", srcCol="src",
dstCol="dst",\
+ weightCol=None)
+ Sets params for PowerIterationClustering.
+ """
+ kwargs = self._input_kwargs
+ return self._set(**kwargs)
+
+ @since("2.4.0")
+ def setK(self, value):
+ """
+ Sets the value of :py:attr:`k`.
+ """
+ return self._set(k=value)
+
+ @since("2.4.0")
+ def getK(self):
+ """
+ Gets the value of :py:attr:`k`.
+ """
+ return self.getOrDefault(self.k)
+
+ @since("2.4.0")
+ def setInitMode(self, value):
+ """
+ Sets the value of :py:attr:`initMode`.
+ """
+ return self._set(initMode=value)
+
+ @since("2.4.0")
+ def getInitMode(self):
+ """
+ Gets the value of `initMode`
+ """
+ return self.getOrDefault(self.initMode)
+
+ @since("2.4.0")
+ def setSrcCol(self, value):
+ """
+ Sets the value of :py:attr:`srcCol`.
+ """
+ return self._set(srcCol=value)
+
+ @since("2.4.0")
+ def getSrcCol(self):
+ """
+ Gets the value of :py:attr:`srcCol`.
+ """
+ return self.getOrDefault(self.srcCol)
+
+ @since("2.4.0")
+ def setDstCol(self, value):
+ """
+ Sets the value of :py:attr:`dstCol`.
+ """
+ return self._set(dstCol=value)
+
+ @since("2.4.0")
+ def getDstCol(self):
+ """
+ Gets the value of :py:attr:`dstCol`.
+ """
+ return self.getOrDefault(self.dstCol)
+
+ @since("2.4.0")
+ def assignClusters(self, dataset):
+ """
+ Run the PIC algorithm and returns a cluster assignment for each
input vertex.
+
+ :param dataset:
+ A dataset with columns src, dst, weight representing the
affinity matrix,
+ which is the matrix A in the PIC paper. Suppose the src column
value is i,
+ the dst column value is j, the weight column value is similarity
s,,ij,,
+ which must be nonnegative. This is a symmetric matrix and hence
+ s,,ij,, = s,,ji,,. For any (i, j) with nonzero similarity, there
should be
+ either (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input. Rows
with i = j are
+ ignored, because we assume s,,ij,, = 0.0.
+
+ :return: A dataset that contains columns of vertex id and the
corresponding cluster for
+ the id. The schema of it will be:
+ - id: Long
+ - cluster: Int
+ """
+ weightCol = None
+ w = None
+ if (self.isDefined(self.weightCol)):
+ weightCol = self.getWeightCol()
+ if (weightCol is None or len(weightCol) == 0):
+ w = lit(1.0)
+ else:
+ w = col(weightCol).cast(DoubleType())
+ srcCol = self.getSrcCol()
+ dstCol = self.getDstCol()
+ df = dataset.select(col(srcCol).cast(LongType()),
col(dstCol).cast(LongType()), w)
+ data = df.rdd.map(lambda x: (x[0], x[1], x[2]))
+ algorithm = MLlibPowerIterationClustering()
--- End diff --
@WeichenXu123 Thank you very much for your comment. Will change now.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]