Repository: spark Updated Branches: refs/heads/branch-2.1 173c2387a -> 06e77e009
[SPARK-19319][BACKPORT-2.1][SPARKR] SparkR Kmeans summary returns error when the cluster size doesn't equal to k ## What changes were proposed in this pull request? Backport fix of #16666 ## How was this patch tested? Backport unit tests Author: wm...@hotmail.com <wm...@hotmail.com> Closes #16761 from wangmiao1981/kmeansport. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06e77e00 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06e77e00 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06e77e00 Branch: refs/heads/branch-2.1 Commit: 06e77e0097c6fa0accc5d9d6ce08a65a3828b878 Parents: 173c238 Author: wm...@hotmail.com <wm...@hotmail.com> Authored: Sun Feb 12 10:48:55 2017 -0800 Committer: Felix Cheung <felixche...@apache.org> Committed: Sun Feb 12 10:48:55 2017 -0800 ---------------------------------------------------------------------- R/pkg/R/mllib.R | 29 ++++++++++++++------ R/pkg/inst/tests/testthat/test_mllib.R | 27 ++++++++++++++++++ .../org/apache/spark/ml/r/KMeansWrapper.scala | 11 +++++++- 3 files changed, 58 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/06e77e00/R/pkg/R/mllib.R ---------------------------------------------------------------------- diff --git a/R/pkg/R/mllib.R b/R/pkg/R/mllib.R index 91ce669..1ddfa30 100644 --- a/R/pkg/R/mllib.R +++ b/R/pkg/R/mllib.R @@ -599,6 +599,10 @@ setMethod("summary", signature(object = "IsotonicRegressionModel"), #' @param k number of centers. #' @param maxIter maximum iteration number. #' @param initMode the initialization algorithm choosen to fit the model. +#' @param seed the random seed for cluster initialization +#' @param initSteps the number of steps for the k-means|| initialization mode. +#' This is an advanced setting, the default of 2 is almost always enough. Must be > 0. +#' @param tol convergence tolerance of iterations. #' @param ... additional argument(s) passed to the method. #' @return \code{spark.kmeans} returns a fitted k-means model. #' @rdname spark.kmeans @@ -628,11 +632,16 @@ setMethod("summary", signature(object = "IsotonicRegressionModel"), #' @note spark.kmeans since 2.0.0 #' @seealso \link{predict}, \link{read.ml}, \link{write.ml} setMethod("spark.kmeans", signature(data = "SparkDataFrame", formula = "formula"), - function(data, formula, k = 2, maxIter = 20, initMode = c("k-means||", "random")) { + function(data, formula, k = 2, maxIter = 20, initMode = c("k-means||", "random"), + seed = NULL, initSteps = 2, tol = 1E-4) { formula <- paste(deparse(formula), collapse = "") initMode <- match.arg(initMode) + if (!is.null(seed)) { + seed <- as.character(as.integer(seed)) + } jobj <- callJStatic("org.apache.spark.ml.r.KMeansWrapper", "fit", data@sdf, formula, - as.integer(k), as.integer(maxIter), initMode) + as.integer(k), as.integer(maxIter), initMode, seed, + as.integer(initSteps), as.numeric(tol)) new("KMeansModel", jobj = jobj) }) @@ -671,10 +680,13 @@ setMethod("fitted", signature(object = "KMeansModel"), #' @param object a fitted k-means model. #' @return \code{summary} returns summary information of the fitted model, which is a list. -#' The list includes the model's \code{k} (number of cluster centers), +#' The list includes the model's \code{k} (the configured number of cluster centers), #' \code{coefficients} (model cluster centers), -#' \code{size} (number of data points in each cluster), and \code{cluster} -#' (cluster centers of the transformed data). +#' \code{size} (number of data points in each cluster), \code{cluster} +#' (cluster centers of the transformed data), {is.loaded} (whether the model is loaded +#' from a saved file), and \code{clusterSize} +#' (the actual number of cluster centers. When using initMode = "random", +#' \code{clusterSize} may not equal to \code{k}). #' @rdname spark.kmeans #' @export #' @note summary(KMeansModel) since 2.0.0 @@ -686,16 +698,17 @@ setMethod("summary", signature(object = "KMeansModel"), coefficients <- callJMethod(jobj, "coefficients") k <- callJMethod(jobj, "k") size <- callJMethod(jobj, "size") - coefficients <- t(matrix(coefficients, ncol = k)) + clusterSize <- callJMethod(jobj, "clusterSize") + coefficients <- t(matrix(coefficients, ncol = clusterSize)) colnames(coefficients) <- unlist(features) - rownames(coefficients) <- 1:k + rownames(coefficients) <- 1:clusterSize cluster <- if (is.loaded) { NULL } else { dataFrame(callJMethod(jobj, "cluster")) } list(k = k, coefficients = coefficients, size = size, - cluster = cluster, is.loaded = is.loaded) + cluster = cluster, is.loaded = is.loaded, clusterSize = clusterSize) }) # Predicted values based on a k-means model http://git-wip-us.apache.org/repos/asf/spark/blob/06e77e00/R/pkg/inst/tests/testthat/test_mllib.R ---------------------------------------------------------------------- diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index 3891f00..8fe3a87 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -375,6 +375,33 @@ test_that("spark.kmeans", { expect_true(summary2$is.loaded) unlink(modelPath) + + # Test Kmeans on dataset that is sensitive to seed value + col1 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0) + col2 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0) + col3 <- c(1, 2, 3, 4, 0, 1, 2, 3, 4, 0) + cols <- as.data.frame(cbind(col1, col2, col3)) + df <- createDataFrame(cols) + + model1 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10, + initMode = "random", seed = 1, tol = 1E-5) + model2 <- spark.kmeans(data = df, ~ ., k = 5, maxIter = 10, + initMode = "random", seed = 22222, tol = 1E-5) + + summary.model1 <- summary(model1) + summary.model2 <- summary(model2) + cluster1 <- summary.model1$cluster + cluster2 <- summary.model2$cluster + clusterSize1 <- summary.model1$clusterSize + clusterSize2 <- summary.model2$clusterSize + + # The predicted clusters are different + expect_equal(sort(collect(distinct(select(cluster1, "prediction")))$prediction), + c(0, 1, 2, 3)) + expect_equal(sort(collect(distinct(select(cluster2, "prediction")))$prediction), + c(0, 1, 2)) + expect_equal(clusterSize1, 4) + expect_equal(clusterSize2, 3) }) test_that("spark.mlp", { http://git-wip-us.apache.org/repos/asf/spark/blob/06e77e00/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala index ea94585..8d59686 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/r/KMeansWrapper.scala @@ -43,6 +43,8 @@ private[r] class KMeansWrapper private ( lazy val cluster: DataFrame = kMeansModel.summary.cluster + lazy val clusterSize: Int = kMeansModel.clusterCenters.size + def fitted(method: String): DataFrame = { if (method == "centers") { kMeansModel.summary.predictions.drop(kMeansModel.getFeaturesCol) @@ -68,7 +70,10 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] { formula: String, k: Int, maxIter: Int, - initMode: String): KMeansWrapper = { + initMode: String, + seed: String, + initSteps: Int, + tol: Double): KMeansWrapper = { val rFormula = new RFormula() .setFormula(formula) @@ -87,6 +92,10 @@ private[r] object KMeansWrapper extends MLReadable[KMeansWrapper] { .setMaxIter(maxIter) .setInitMode(initMode) .setFeaturesCol(rFormula.getFeaturesCol) + .setInitSteps(initSteps) + .setTol(tol) + + if (seed != null && seed.length > 0) kMeans.setSeed(seed.toInt) val pipeline = new Pipeline() .setStages(Array(rFormulaModel, kMeans)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org