Github user huaxingao commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21513#discussion_r194190113
  
    --- Diff: python/pyspark/ml/clustering.py ---
    @@ -1156,6 +1159,216 @@ def getKeepLastCheckpoint(self):
             return self.getOrDefault(self.keepLastCheckpoint)
     
     
    +@inherit_doc
    +class PowerIterationClustering(HasMaxIter, HasWeightCol, JavaParams, 
JavaMLReadable,
    +                               JavaMLWritable):
    +    """
    +    .. note:: Experimental
    +
    +    Power Iteration Clustering (PIC), a scalable graph clustering 
algorithm developed by
    +    <a href=http://www.icml2010.org/papers/387.pdf>Lin and Cohen</a>. From 
the abstract:
    +    PIC finds a very low-dimensional embedding of a dataset using 
truncated power
    +    iteration on a normalized pair-wise similarity matrix of the data.
    +
    +    This class is not yet an Estimator/Transformer, use `assignClusters` 
method to run the
    +    PowerIterationClustering algorithm.
    +
    +    .. seealso:: `Wikipedia on Spectral clustering \
    +    <http://en.wikipedia.org/wiki/Spectral_clustering>`_
    +
    +    >>> from pyspark.sql.types import StructField, StructType
    +    >>> import math
    +    >>> def genCircle(r, n):
    +    ...     points = []
    +    ...     for i in range(0, n):
    +    ...         theta = 2.0 * math.pi * i / n
    +    ...         points.append((r * math.cos(theta), r * math.sin(theta)))
    +    ...     return points
    +    >>> def sim(x, y):
    +    ...     dist = (x[0] - y[0]) * (x[0] - y[0]) + (x[1] - y[1]) * (x[1] - 
y[1])
    +    ...     return math.exp(-dist / 2.0)
    +    >>> r1 = 1.0
    +    >>> n1 = 10
    +    >>> r2 = 4.0
    +    >>> n2 = 40
    +    >>> n = n1 + n2
    +    >>> points = genCircle(r1, n1) + genCircle(r2, n2)
    +    >>> data = [(i, j, sim(points[i], points[j])) for i in range(1, n) for 
j in range(0, i)]
    +    >>> rdd = sc.parallelize(data, 2)
    +    >>> schema = StructType([StructField("src", LongType(), False), \
    +                 StructField("dst", LongType(),  True), \
    +                 StructField("weight", DoubleType(), True)])
    +    >>> df = spark.createDataFrame(rdd, schema)
    +    >>> pic = PowerIterationClustering()
    +    >>> assignments = 
pic.setK(2).setMaxIter(40).setWeightCol("weight").assignClusters(df)
    +    >>> result = sorted(assignments.collect(), key=lambda x: x.id)
    +    >>> result[0].cluster == result[1].cluster == result[2].cluster == 
result[3].cluster
    +    True
    +    >>> result[4].cluster == result[5].cluster == result[6].cluster == 
result[7].cluster
    +    True
    +    >>> pic_path = temp_path + "/pic"
    +    >>> pic.save(pic_path)
    +    >>> pic2 = PowerIterationClustering.load(pic_path)
    +    >>> pic2.getK()
    +    2
    +    >>> pic2.getMaxIter()
    +    40
    +    >>> assignments2 = pic2.assignClusters(df)
    +    >>> result2 = sorted(assignments2.collect(), key=lambda x: x.id)
    +    >>> result2[0].cluster == result2[1].cluster == result2[2].cluster == 
result2[3].cluster
    +    True
    +    >>> result2[4].cluster == result2[5].cluster == result2[6].cluster == 
result2[7].cluster
    +    True
    +    >>> pic3 = PowerIterationClustering(k=4, initMode="degree", 
srcCol="source", dstCol="dest")
    +    >>> pic3.getSrcCol()
    +    'source'
    +    >>> pic3.getDstCol()
    +    'dest'
    +    >>> pic3.getK()
    +    4
    +    >>> pic3.getMaxIter()
    +    20
    +    >>> pic3.getInitMode()
    +    'degree'
    +
    +    .. versionadded:: 2.4.0
    +    """
    +
    +    k = Param(Params._dummy(), "k",
    +              "The number of clusters to create. Must be > 1.",
    +              typeConverter=TypeConverters.toInt)
    +    initMode = Param(Params._dummy(), "initMode",
    +                     "The initialization algorithm. This can be either " +
    +                     "'random' to use a random vector as vertex 
properties, or 'degree' to use " +
    +                     "a normalized sum of similarities with other 
vertices.  Supported options: " +
    +                     "'random' and 'degree'.",
    +                     typeConverter=TypeConverters.toString)
    +    srcCol = Param(Params._dummy(), "srcCol",
    +                   "Name of the input column for source vertex IDs.",
    +                   typeConverter=TypeConverters.toString)
    +    dstCol = Param(Params._dummy(), "dstCol",
    +                   "Name of the input column for destination vertex IDs.",
    +                   typeConverter=TypeConverters.toString)
    +
    +    @keyword_only
    +    def __init__(self, k=2, maxIter=20, initMode="random", srcCol="src", 
dstCol="dst",
    +                 weightCol=None):
    +        """
    +        __init__(self, k=2, maxIter=20, initMode="random", srcCol="src", 
dstCol="dst",\
    +                 weightCol=None)
    +        """
    +        super(PowerIterationClustering, self).__init__()
    +        self._java_obj = self._new_java_obj(
    +            "org.apache.spark.ml.clustering.PowerIterationClustering", 
self.uid)
    +        self._setDefault(k=2, maxIter=20, initMode="random", srcCol="src", 
dstCol="dst")
    +        kwargs = self._input_kwargs
    +        self.setParams(**kwargs)
    +
    +    @keyword_only
    +    @since("2.4.0")
    +    def setParams(self, k=2, maxIter=20, initMode="random", srcCol="src", 
dstCol="dst",
    +                  weightCol=None):
    +        """
    +        setParams(self, k=2, maxIter=20, initMode="random", srcCol="src", 
dstCol="dst",\
    +                  weightCol=None)
    +        Sets params for PowerIterationClustering.
    +        """
    +        kwargs = self._input_kwargs
    +        return self._set(**kwargs)
    +
    +    @since("2.4.0")
    +    def setK(self, value):
    +        """
    +        Sets the value of :py:attr:`k`.
    +        """
    +        return self._set(k=value)
    +
    +    @since("2.4.0")
    +    def getK(self):
    +        """
    +        Gets the value of :py:attr:`k`.
    +        """
    +        return self.getOrDefault(self.k)
    +
    +    @since("2.4.0")
    +    def setInitMode(self, value):
    +        """
    +        Sets the value of :py:attr:`initMode`.
    +        """
    +        return self._set(initMode=value)
    +
    +    @since("2.4.0")
    +    def getInitMode(self):
    +        """
    +        Gets the value of `initMode`
    +        """
    +        return self.getOrDefault(self.initMode)
    +
    +    @since("2.4.0")
    +    def setSrcCol(self, value):
    +        """
    +        Sets the value of :py:attr:`srcCol`.
    +        """
    +        return self._set(srcCol=value)
    +
    +    @since("2.4.0")
    +    def getSrcCol(self):
    +        """
    +        Gets the value of :py:attr:`srcCol`.
    +        """
    +        return self.getOrDefault(self.srcCol)
    +
    +    @since("2.4.0")
    +    def setDstCol(self, value):
    +        """
    +        Sets the value of :py:attr:`dstCol`.
    +        """
    +        return self._set(dstCol=value)
    +
    +    @since("2.4.0")
    +    def getDstCol(self):
    +        """
    +        Gets the value of :py:attr:`dstCol`.
    +        """
    +        return self.getOrDefault(self.dstCol)
    +
    +    @since("2.4.0")
    +    def assignClusters(self, dataset):
    +        """
    +        Run the PIC algorithm and returns a cluster assignment for each 
input vertex.
    +
    +        :param dataset:
    +          A dataset with columns src, dst, weight representing the 
affinity matrix,
    +          which is the matrix A in the PIC paper. Suppose the src column 
value is i,
    +          the dst column value is j, the weight column value is similarity 
s,,ij,,
    +          which must be nonnegative. This is a symmetric matrix and hence
    +          s,,ij,, = s,,ji,,. For any (i, j) with nonzero similarity, there 
should be
    +          either (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input. Rows 
with i = j are
    +          ignored, because we assume s,,ij,, = 0.0.
    +
    +        :return: A dataset that contains columns of vertex id and the 
corresponding cluster for
    +                 the id. The schema of it will be:
    +                   - id: Long
    +                   - cluster: Int
    +        """
    +        weightCol = None
    +        w = None
    +        if (self.isDefined(self.weightCol)):
    +            weightCol = self.getWeightCol()
    +        if (weightCol is None or len(weightCol) == 0):
    +            w = lit(1.0)
    +        else:
    +            w = col(weightCol).cast(DoubleType())
    +        srcCol = self.getSrcCol()
    +        dstCol = self.getDstCol()
    +        df = dataset.select(col(srcCol).cast(LongType()), 
col(dstCol).cast(LongType()), w)
    +        data = df.rdd.map(lambda x: (x[0], x[1], x[2]))
    +        algorithm = MLlibPowerIterationClustering()
    --- End diff --
    
    @WeichenXu123 Thank you very much for your comment. Will change now. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to