Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/5267#discussion_r43071613
--- Diff:
mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala ---
@@ -0,0 +1,744 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.clustering
+
+import scala.collection.{Map, mutable}
+
+import breeze.linalg.{SparseVector => BSV, Vector => BV, norm =>
breezeNorm, sum => breezeSum}
+
+import org.apache.spark.{Logging, SparkException}
+import org.apache.spark.annotation.Since
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * This is a divisive hierarchical clustering algorithm based on bisecting
k-means algorithm.
+ *
+ * The main idea of this algorithm is based on "A comparison of document
clustering techniques",
+ * M. Steinbach, G. Karypis and V. Kumar. Workshop on Text Mining, KDD,
2000.
+ * http://cs.fit.edu/~pkc/classes/ml-internet/papers/steinbach00tr.pdf
+ *
+ * However, we modified it to fit for Spark. This algorithm consists of
the two main parts.
+ *
+ * 1. Split clusters until the number of clusters will be enough to build
a cluster tree
+ * 2. Build a cluster tree as a binary tree by the splitted clusters
+ *
+ * First, it splits clusters to their children clusters step by step, not
considering a cluster
+ * will be included in the final cluster tree or not. That's because it
makes the algorithm more
+ * efficient on Spark and splitting a cluster one by one is very slow. It
will keep splitting until
+ * the number of clusters will be enough to build a cluster tree.
Otherwise, it will stop splitting
+ * when there are no dividable clusters before the number of clusters will
be sufficient. And
+ * it calculates the costs, such as average cost, entropy and so on, for
building a cluster
+ * tree in the first part. The costs means how large the cluster is. That
is, the cluster
+ * whose cost is maximum of all the clusters is the largest cluster.
+ *
+ * Second, it builds a cluster tree as a binary tree by the result of the
first part.
+ * First of all, the cluster tree starts with only the root cluster which
includes all points.
+ * So, there are two candidates which can be merged to the cluster tree.
Those are the children of
+ * the root. Then, it picks up the larger child of the two and merge it to
the cluster tree.
+ * After that, there are tree candidates to merge. Those are the smaller
child of the root and
+ * the two children of the larger cluster of the root. It picks up the
largest cluster of the tree
+ * and merge it to the * cluster tree. Like this, it continues to pick up
the largest one of the
+ * candidates and merge it to the cluster tree until the desired number of
clusters is reached.
+ *
+ * @param k tne desired number of clusters
+ * @param maxIterations the number of maximal iterations to split clusters
+ * @param seed a random seed
+ */
+@Since("1.6.0")
+class BisectingKMeans private (
+ private var k: Int,
+ private var maxIterations: Int,
+ private var seed: Long) extends Logging {
+
+ import BisectingKMeans._
+
+ /**
+ * Constructs with the default configuration
+ */
+ @Since("1.6.0")
+ def this() = this(20, 20, 1)
+
+ /**
+ * Sets the number of clusters you want
+ */
+ @Since("1.6.0")
+ def setK(k: Int): this.type = {
+ this.k = k
+ this
+ }
+
+ @Since("1.6.0")
+ def getK: Int = this.k
+
+ /**
+ * Sets the number of maximal iterations in each clustering step
+ */
+ @Since("1.6.0")
+ def setMaxIterations(maxIterations: Int): this.type = {
+ this.maxIterations = maxIterations
+ this
+ }
+
+ @Since("1.6.0")
+ def getMaxIterations: Int = this.maxIterations
+
+ /**
+ * Sets the random seed
+ */
+ @Since("1.6.0")
+ def setSeed(seed: Long): this.type = {
+ this.seed = seed
+ this
+ }
+
+ @Since("1.6.0")
+ def getSeed: Long = this.seed
+
+ /**
+ * Runs the bisecting k-means algorithm
+ * @param input RDD of vectors
+ * @return model for the bisecting kmeans
+ */
+ @Since("1.6.0")
+ def run(input: RDD[Vector]): BisectingKMeansModel = {
+ val sc = input.sparkContext
+
+ // `clusterStats` is described as binary tree structure
+ // `clusterStats(1)` means the root of a binary tree
+ var clusterStats = mutable.Map.empty[Long, BisectingClusterStat]
+ var step = 1
+ var noMoreDividable = false
+ var updatedDataHistory = Array.empty[RDD[(Long, BV[Double])]]
+ // the minimum number of nodes of a binary tree by given parameter
+ val numNodeLimit = getMinimumNumNodesInTree(this.k)
+
+ // divide clusters until the number of clusters reachs the condition
+ // or there is no dividable cluster
+ val startTime = System.currentTimeMillis()
+ var data = initData(input).cache()
+ while (clusterStats.size < numNodeLimit && noMoreDividable == false) {
+ logInfo(s"${sc.appName} starts step ${step}")
+ // TODO Remove non-leaf cluster stats from `leafClusterStats`
+ val leafClusterStats = summarizeClusters(data)
+ val dividableLeafClusters = leafClusterStats.filter(_._2.isDividable)
+ clusterStats = clusterStats ++ leafClusterStats
+
+ // can be clustered if the number of divided clusterStats is equal
to 0
+ val divided = divideClusters(data, dividableLeafClusters,
maxIterations)
+ // update each index
+ val newData = updateClusterIndex(data, divided).cache()
+ updatedDataHistory = updatedDataHistory ++ Array(data)
+ data = newData
+ // keep recent 2 cached RDDs in order to run more quickly
+ if (updatedDataHistory.length > 1) {
+ val head = updatedDataHistory.head
+ updatedDataHistory = updatedDataHistory.tail
+ head.unpersist()
+ }
+ clusterStats = clusterStats ++ divided
+ step += 1
+ logInfo(s"${sc.appName} adding ${divided.size} new clusterStats at
step:${step}")
+
+ if (dividableLeafClusters.isEmpty) {
+ noMoreDividable = true
+ }
+ }
+ // create a map of cluster node with their costs
+ val nodes = createClusterNodes(data, clusterStats)
+ // unpersist RDDs
+ data.unpersist()
+ updatedDataHistory.foreach(_.unpersist())
+
+ // build a cluster tree by Map class which is expressed
+ logInfo(s"Building the cluster tree is started in ${sc.appName}")
+ val root = buildTree(nodes, ROOT_INDEX_KEY, this.k)
+ if (root.isEmpty) {
+ new SparkException("Failed to build a cluster tree from a Map type
of clusterStats")
+ }
+
+ // set the elapsed time for training
+ val finishTime = (System.currentTimeMillis() - startTime) / 1000.0
+ logInfo(s"Elapsed Time for ${this.getClass.getSimpleName} Training:
${finishTime} [sec]")
+
+ // make a bisecting kmeans model
+ val model = new BisectingKMeansModel(root.get)
+ val leavesNodes = model.getClusters
+ if (leavesNodes.length < this.k) {
+ logWarning(s"# clusters is less than you want: ${leavesNodes.length}
/ ${k}")
+ }
+ model
+ }
+}
+
+
+private[clustering] object BisectingKMeans {
+
+ import BisectingClusterStat._
+
+ val ROOT_INDEX_KEY: Long = 1
+
+ /**
+ * Finds the closes cluster's center
+ *
+ * @param metric a distance metric
+ * @param centers centers of the clusters
+ * @param point a target point
+ * @return an index of the array of clusters
+ */
+ def findClosestCenter(metric: (BV[Double], BV[Double]) => Double)
+ (centers: Seq[BV[Double]])(point: BV[Double]): Int = {
+ // get the closest index
+ centers.zipWithIndex.map { case (center, idx) => (metric(center,
point), idx)}.minBy(_._1)._2
+ }
+
+ /**
+ * Gets the minimum number of nodes in a tree by the number of leaves
+ *
+ * @param k: the number of leaf nodes
+ */
+ def getMinimumNumNodesInTree(k: Int): Int = {
+ val multiplier = math.ceil(math.log(k) / math.log(2.0))
+ // the calculation is same as `math.pow(2, multiplier)`
+ var numNodes = 2
+ (1 to multiplier.toInt).foreach (i => numNodes = numNodes << 1)
+ numNodes
+ }
+
+ /**
+ * Summarizes data by each cluster as Map
+ *
+ * @param data pairs of point and its cluster index
+ */
+ def summarizeClusters(data: RDD[(Long, BV[Double])]): Map[Long,
BisectingClusterStat] = {
+
+ data.mapPartitions { iter =>
+ // calculate the accumulation of the all point in a partition and
count the rows
+ val map = mutable.Map.empty[Long, (BV[Double], Double, BV[Double])]
+ iter.foreach { case (idx: Long, point: BV[Double]) =>
+ // get a map value or else get a sparse vector
+ val (sumBV, n, sumOfSquares) = map
+ .getOrElse(idx, (BSV.zeros[Double](point.size), 0.0,
BSV.zeros[Double](point.size)))
+ map(idx) = (sumBV + point, n + 1.0, sumOfSquares + (point :*
point))
--- End diff --
We don't need a vector for `sumOfSquares` because we only uses its sum to
compute in-cluster variance. A single scalar should work.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]