Github user freeman-lab commented on a diff in the pull request:
https://github.com/apache/spark/pull/2906#discussion_r22632804
--- Diff:
mllib/src/main/scala/org/apache/spark/mllib/clustering/HierarchicalClustering.scala
---
@@ -0,0 +1,627 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.clustering
+
+import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector =>
BV, norm => breezeNorm}
+import org.apache.spark.Logging
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.random.XORShiftRandom
+
+/**
+ * This trait is used for the configuration of the hierarchical clustering
+ */
+sealed
+trait HierarchicalClusteringConf extends Serializable {
+ this: HierarchicalClustering =>
+
+ def setNumClusters(numClusters: Int): this.type = {
+ this.numClusters = numClusters
+ this
+ }
+
+ def getNumClusters(): Int = this.numClusters
+
+ def setNumRetries(numRetries: Int): this.type = {
+ this.numRetries = numRetries
+ this
+ }
+
+ def getNumRetries(): Int = this.numRetries
+
+ def setSubIterations(subIterations: Int): this.type = {
+ this.subIterations = subIterations
+ this
+ }
+
+ def getSubIterations(): Int = this.subIterations
+
+ def setEpsilon(epsilon: Double): this.type = {
+ this.epsilon = epsilon
+ this
+ }
+
+ def getEpsilon(): Double = this.epsilon
+
+ def setRandomSeed(seed: Int): this.type = {
+ this.randomSeed = seed
+ this
+ }
+
+ def getRandomSeed(): Int = this.randomSeed
+
+ def setRandomRange(range: Double): this.type = {
+ this.randomRange = range
+ this
+ }
+}
+
+
+/**
+ * This is a divisive hierarchical clustering algorithm based on bi-sect
k-means algorithm.
+ *
+ * The main idea of this algorithm is derived from:
+ * "A comparison of document clustering techniques",
+ * M. Steinbach, G. Karypis and V. Kumar. Workshop on Text Mining, KDD,
2000.
+ * http://cs.fit.edu/~pkc/classes/ml-internet/papers/steinbach00tr.pdf
+ *
+ * @param numClusters the number of clusters you want
+ * @param subIterations the number of iterations at digging
+ * @param epsilon the threshold to stop the sub-iterations
+ * @param randomSeed uses in sampling data for initializing centers in
each sub iterations
+ * @param randomRange the range coefficient to generate random points in
each clustering step
+ */
+class HierarchicalClustering(
+ private[mllib] var numClusters: Int,
+ private[mllib] var subIterations: Int,
+ private[mllib] var numRetries: Int,
+ private[mllib] var epsilon: Double,
+ private[mllib] var randomSeed: Int,
+ private[mllib] var randomRange: Double)
+ extends Serializable with Logging with HierarchicalClusteringConf {
+
+ /**
+ * Constructs with the default configuration
+ */
+ def this() = this(20, 20, 10, 10E-4, 1, 0.1)
+
+ /** Shows the parameters */
+ override def toString(): String = {
+ Array(
+ s"numClusters:${numClusters}",
+ s"subIterations:${subIterations}",
+ s"numRetries:${numRetries}",
+ s"epsilon:${epsilon}",
+ s"randomSeed:${randomSeed}",
+ s"randomRange:${randomRange}"
+ ).mkString(", ")
+ }
+
+ /**
+ * Trains a hierarchical clustering model with the given configuration
+ *
+ * @param data training points
+ * @return a model for hierarchical clustering
+ */
+ def run(data: RDD[Vector]): HierarchicalClusteringModel = {
+ validateData(data)
+ logInfo(s"Run with ${this}")
+
+ val startTime = System.currentTimeMillis() // to measure the execution
time
+ val clusterTree = ClusterTree.fromRDD(data) // make the root node
+ val model = new HierarchicalClusteringModel(clusterTree)
+ val statsUpdater = new ClusterTreeStatsUpdater()
+
+ var node: Option[ClusterTree] = Some(model.clusterTree)
+ statsUpdater(node.get)
+
+ // If the followed conditions are satisfied, and then stop the
training.
+ // 1. There is no splittable cluster
+ // 2. The number of the splitted clusters is greater than that of
given clusters
+ var totalVariance = Double.MaxValue
+ var newTotalVariance = model.clusterTree.getVariance().get
+ var step = 1
+ while (node != None
+ && model.clusterTree.getTreeSize() < this.numClusters) {
+
+ // split some times in order not to be wrong clustering result
+ var isMerged = false
+ for (i <- 1 to this.numRetries) {
+ if (node.get.getVariance().get > this.epsilon && isMerged ==
false) {
+ var subNodes = split(node.get).map(subNode =>
statsUpdater(subNode))
+ if (subNodes.size == 2) {
+ // insert the nodes to the tree
+ node.get.insert(subNodes.toList)
+ // calculate the local dendrogram height
+ val dist = breezeNorm(subNodes(0).center.toBreeze -
subNodes(1).center.toBreeze, 2)
+ node.get.height = Some(dist)
+ // unpersist unnecessary cache because its children nodes are
cached
+ node.get.data.unpersist()
+ logInfo(s"the number of cluster is
${model.clusterTree.getTreeSize()} at step ${step}")
+ isMerged = true
+ }
+ }
+ }
+ node.get.isVisited = true
+
+ // update the total variance and select the next splittable node
+ node = nextNode(model.clusterTree)
+ step += 1
+ }
+
+ model.isTrained = true
+ val trainTime = (System.currentTimeMillis() - startTime).toInt
+ logInfo(s"Elapsed Time for Training: ${trainTime.toDouble / 1000}
[sec]")
+ model
+ }
+
+ /**
+ * validate the given data to train
+ */
+ private def validateData(data: RDD[Vector]) {
+ require(this.numClusters <= data.count(), "# clusters must be less
than # data rows")
+ }
+
+ /**
+ * Selects the next node to split
+ */
+ private[clustering] def nextNode(clusterTree: ClusterTree):
Option[ClusterTree] = {
+ // select the max variance of clusters which are leaves of a tree
+ clusterTree.toSeq().filter(tree => tree.isSplittable() &&
!tree.isVisited) match {
+ case list if list.isEmpty => None
+ case list => Some(list.maxBy(_.getVariance()))
+ }
+ }
+
+ /**
+ * Takes the initial centers for bi-sect k-means
+ */
+ private[clustering] def takeInitCenters(centers: Vector):
Array[BV[Double]] = {
+ val random = new XORShiftRandom()
+ Array(
+ centers.toBreeze.map(elm => elm - random.nextDouble() * elm *
this.randomRange),
+ centers.toBreeze.map(elm => elm + random.nextDouble() * elm *
this.randomRange)
+ )
+ }
+
+ /**
+ * Splits the given cluster (tree) with bi-sect k-means
+ *
+ * @param clusterTree the splitted cluster
+ * @return an array of ClusterTree. its size is generally 2, but its
size can be 1
+ */
+ private def split(clusterTree: ClusterTree): Array[ClusterTree] = {
+ val startTime = System.currentTimeMillis()
+ val data = clusterTree.data
+ val sc = data.sparkContext
+ var centers = takeInitCenters(clusterTree.center)
+
+ // TODO Supports distance metrics other Euclidean distance metric
+ val metric = (bv1: BV[Double], bv2: BV[Double]) => breezeNorm(bv1 -
bv2, 2.0)
+ sc.broadcast(metric)
+
+ // If the following conditions are satisfied, the iteration is stopped
+ // 1. the relative error is less than that of configuration
+ // 2. the number of executed iteration is greater than that of
configuration
+ // 3. the number of centers is equal to one. if one means that the
cluster is not splittable
+ var numIter = 0
+ var error = Double.MaxValue
+ while (error > this.epsilon
+ && numIter < this.subIterations
+ && centers.size > 1) {
+ val startTimeOfIter = System.currentTimeMillis()
+
+ sc.broadcast(centers)
+ val newCenters = data.mapPartitions { iter =>
+ // calculate the accumulation of the all point in a partition and
count the rows
+ val map = scala.collection.mutable.Map.empty[Int, (BV[Double],
Int)]
+ iter.foreach { point =>
+ val idx = ClusterTree.findClosestCenter(metric)(centers)(point)
+ val (sumBV, n) = map.get(idx)
+ .getOrElse((new BSV[Double](Array(), Array(), point.size),
0))
+ map(idx) = (sumBV + point, n + 1)
+ }
+ map.toIterator
+ }.reduceByKeyLocally {
+ // sum the accumulation and the count in the all partition
+ case ((p1, n1), (p2, n2)) => (p1 + p2, n1 + n2)
+ }.map { case ((idx: Int, (center: BV[Double], counts: Int))) =>
+ center :/ counts.toDouble
+ }
+
+ val normSum = centers.map(v => breezeNorm(v, 2.0)).sum
+ val newNormSum = newCenters.map(v => breezeNorm(v, 2.0)).sum
+ error = math.abs((normSum - newNormSum) / normSum)
+ centers = newCenters.toArray
+ numIter += 1
+
+ logInfo(s"${numIter} iterations is finished" +
+ s" for ${System.currentTimeMillis() - startTimeOfIter}" +
+ s" at ${getClass}.split")
+ }
+
+ val vectors = centers.map(center => Vectors.fromBreeze(center))
+ val nodes = centers.size match {
+ case 1 => Array(new ClusterTree(vectors(0), data))
+ case 2 => {
+ val closest = data.map(p =>
(ClusterTree.findClosestCenter(metric)(centers)(p), p))
+ centers.zipWithIndex.map { case (center, i) =>
+ val subData = closest.filter(_._1 == i).map(_._2)
+ subData.cache
+ new ClusterTree(vectors(i), subData)
+ }
+ }
+ case _ => throw new RuntimeException(s"something wrong with #
centers:${centers.size}")
+ }
+ logInfo(s"${this.getClass.getSimpleName}.split end" +
+ s" with total iterations" +
+ s" for ${System.currentTimeMillis() - startTime}")
+ nodes
+ }
+}
+
+/**
+ * top-level methods for calling the hierarchical clustering algorithm
+ */
+object HierarchicalClustering {
+
+ /**
+ * Trains a hierarchical clustering model with the given data
+ *
+ * @param data trained data
+ * @param numClusters the maximum number of clusters you want
+ * @return a hierarchical clustering model
+ */
+ def train(data: RDD[Vector], numClusters: Int):
HierarchicalClusteringModel = {
+ val app = new HierarchicalClustering().setNumClusters(numClusters)
+ app.run(data)
+ }
+
+ /**
+ * Trains a hierarchical clustering model with the given data
+ *
+ * @param data trained data
+ * @param numClusters the maximum number of clusters you want
+ * @param subIterations the iteration of
--- End diff --
Incomplete sentence?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]