Repository: spark
Updated Branches:
  refs/heads/branch-2.1 173c2387a -> 06e77e009

[SPARK-19319][BACKPORT-2.1][SPARKR] SparkR Kmeans summary returns error when 
the cluster size doesn't equal to k

## What changes were proposed in this pull request?

Backport fix of #16666

## How was this patch tested?

Backport unit tests

Author: <>

Closes #16761 from wangmiao1981/kmeansport.


Branch: refs/heads/branch-2.1
Commit: 06e77e0097c6fa0accc5d9d6ce08a65a3828b878
Parents: 173c238
Author: <>
Authored: Sun Feb 12 10:48:55 2017 -0800
Committer: Felix Cheung <>
Committed: Sun Feb 12 10:48:55 2017 -0800

 R/pkg/R/mllib.R                                 | 29 ++++++++++++++------
 R/pkg/inst/tests/testthat/test_mllib.R          | 27 ++++++++++++++++++
 .../org/apache/spark/ml/r/KMeansWrapper.scala   | 11 +++++++-
 3 files changed, 58 insertions(+), 9 deletions(-)
diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R
index 91ce669..1ddfa30 100644
--- a/R/pkg/R/mllib.R
+++ b/R/pkg/R/mllib.R
@@ -599,6 +599,10 @@ setMethod("summary", signature(object = 
 #' @param k number of centers.
 #' @param maxIter maximum iteration number.
 #' @param initMode the initialization algorithm choosen to fit the model.
+#' @param seed the random seed for cluster initialization
+#' @param initSteps the number of steps for the k-means|| initialization mode.
+#'                  This is an advanced setting, the default of 2 is almost 
always enough. Must be > 0.
+#' @param tol convergence tolerance of iterations.
 #' @param ... additional argument(s) passed to the method.
 #' @return \code{spark.kmeans} returns a fitted k-means model.
 #' @rdname spark.kmeans
@@ -628,11 +632,16 @@ setMethod("summary", signature(object = 
 #' @note spark.kmeans since 2.0.0
 #' @seealso \link{predict}, \link{}, \link{}
 setMethod("spark.kmeans", signature(data = "SparkDataFrame", formula = 
-          function(data, formula, k = 2, maxIter = 20, initMode = 
c("k-means||", "random")) {
+          function(data, formula, k = 2, maxIter = 20, initMode = 
c("k-means||", "random"),
+                   seed = NULL, initSteps = 2, tol = 1E-4) {
             formula <- paste(deparse(formula), collapse = "")
             initMode <- match.arg(initMode)
+            if (!is.null(seed)) {
+              seed <- as.character(as.integer(seed))
+            }
             jobj <- callJStatic("", "fit", 
data@sdf, formula,
-                                as.integer(k), as.integer(maxIter), initMode)
+                                as.integer(k), as.integer(maxIter), initMode, 
+                                as.integer(initSteps), as.numeric(tol))
             new("KMeansModel", jobj = jobj)
@@ -671,10 +680,13 @@ setMethod("fitted", signature(object = "KMeansModel"),
 #' @param object a fitted k-means model.
 #' @return \code{summary} returns summary information of the fitted model, 
which is a list.
-#'         The list includes the model's \code{k} (number of cluster centers),
+#'         The list includes the model's \code{k} (the configured number of 
cluster centers),
 #'         \code{coefficients} (model cluster centers),
-#'         \code{size} (number of data points in each cluster), and 
-#'         (cluster centers of the transformed data).
+#'         \code{size} (number of data points in each cluster), \code{cluster}
+#'         (cluster centers of the transformed data), {is.loaded} (whether the 
model is loaded
+#'         from a saved file), and \code{clusterSize}
+#'         (the actual number of cluster centers. When using initMode = 
+#'         \code{clusterSize} may not equal to \code{k}).
 #' @rdname spark.kmeans
 #' @export
 #' @note summary(KMeansModel) since 2.0.0
@@ -686,16 +698,17 @@ setMethod("summary", signature(object = "KMeansModel"),
             coefficients <- callJMethod(jobj, "coefficients")
             k <- callJMethod(jobj, "k")
             size <- callJMethod(jobj, "size")
-            coefficients <- t(matrix(coefficients, ncol = k))
+            clusterSize <- callJMethod(jobj, "clusterSize")
+            coefficients <- t(matrix(coefficients, ncol = clusterSize))
             colnames(coefficients) <- unlist(features)
-            rownames(coefficients) <- 1:k
+            rownames(coefficients) <- 1:clusterSize
             cluster <- if (is.loaded) {
             } else {
               dataFrame(callJMethod(jobj, "cluster"))
             list(k = k, coefficients = coefficients, size = size,
-                 cluster = cluster, is.loaded = is.loaded)
+                 cluster = cluster, is.loaded = is.loaded, clusterSize = 
 #  Predicted values based on a k-means model
diff --git a/R/pkg/inst/tests/testthat/test_mllib.R 
index 3891f00..8fe3a87 100644
--- a/R/pkg/inst/tests/testthat/test_mllib.R
+++ b/R/pkg/inst/tests/testthat/test_mllib.R
@@ -375,6 +375,33 @@ test_that("spark.kmeans", {
+  # Test Kmeans on dataset that is sensitive to seed value
+  col1 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
+  col2 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
+  col3 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0)
+  cols <-, col2, col3))
+  df <- createDataFrame(cols)
+  model1 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10,
+                         initMode = "random", seed = 1, tol = 1E-5)
+  model2 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10,
+                         initMode = "random", seed = 22222, tol = 1E-5)
+  summary.model1 <- summary(model1)
+  summary.model2 <- summary(model2)
+  cluster1 <- summary.model1$cluster
+  cluster2 <- summary.model2$cluster
+  clusterSize1 <- summary.model1$clusterSize
+  clusterSize2 <- summary.model2$clusterSize
+  # The predicted clusters are different
+  expect_equal(sort(collect(distinct(select(cluster1, 
+               c(0, 1, 2, 3))
+  expect_equal(sort(collect(distinct(select(cluster2, 
+               c(0, 1, 2))
+  expect_equal(clusterSize1, 4)
+  expect_equal(clusterSize2, 3)
 test_that("spark.mlp", {
diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala 
index ea94585..8d59686 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala
@@ -43,6 +43,8 @@ private[r] class KMeansWrapper private (
   lazy val cluster: DataFrame = kMeansModel.summary.cluster
+  lazy val clusterSize: Int = kMeansModel.clusterCenters.size
   def fitted(method: String): DataFrame = {
     if (method == "centers") {
@@ -68,7 +70,10 @@ private[r] object KMeansWrapper extends 
MLReadable[KMeansWrapper] {
       formula: String,
       k: Int,
       maxIter: Int,
-      initMode: String): KMeansWrapper = {
+      initMode: String,
+      seed: String,
+      initSteps: Int,
+      tol: Double): KMeansWrapper = {
     val rFormula = new RFormula()
@@ -87,6 +92,10 @@ private[r] object KMeansWrapper extends 
MLReadable[KMeansWrapper] {
+      .setInitSteps(initSteps)
+      .setTol(tol)
+    if (seed != null && seed.length > 0) kMeans.setSeed(seed.toInt)
     val pipeline = new Pipeline()
       .setStages(Array(rFormulaModel, kMeans))

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to