xwu99 commented on a change in pull request #28229: URL: https://github.com/apache/spark/pull/28229#discussion_r429522779
########## File path: mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeansBlocksImpl.scala ########## @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.internal.Logging +import org.apache.spark.ml.feature.InstanceBlock +import org.apache.spark.ml.util.Instrumentation +import org.apache.spark.mllib.linalg.{BLAS, DenseMatrix, Vector, Vectors} +import org.apache.spark.mllib.linalg.BLAS.axpy +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.Utils +import org.apache.spark.util.random.XORShiftRandom + +/** + * K-means clustering using RDD[InstanceBlock] as input format. + */ +class KMeansBlocksImpl ( + private var k: Int, + private var maxIterations: Int, + private var initializationMode: String, + private var initializationSteps: Int, + private var epsilon: Double, + private var seed: Long, + private var distanceMeasure: String) extends Serializable with Logging { + + /** + * Constructs a KMeans instance with default parameters: {k: 2, maxIterations: 20, + * initializationMode: "k-means||", initializationSteps: 2, epsilon: 1e-4, seed: random, + * distanceMeasure: "euclidean"}. + */ + def this() = this(2, 20, KMeansBlocksImpl.K_MEANS_PARALLEL, 2, 1e-4, Utils.random.nextLong(), + DistanceMeasure.EUCLIDEAN) + + // Initial cluster centers can be provided as a KMeansModel object rather than using the + // random or k-means|| initializationMode + private var initialModel: Option[KMeansModel] = None + + private def VectorsToDenseMatrix(vectors: Array[VectorWithNorm]): DenseMatrix = { + + val v: Array[Vector] = vectors.map(_.vector) + + VectorsToDenseMatrix(v.toIterator) + } + + private def VectorsToDenseMatrix(vectors: Iterator[Vector]): DenseMatrix = { + val vector_array = vectors.toArray + val column_num = vector_array(0).size + val row_num = vector_array.length + + val values = new Array[Double](row_num * column_num) + var rowIndex = 0 + + // convert to column-major dense matrix + for (vector <- vector_array) { + for ((value, index) <- vector.toArray.zipWithIndex) { + values(index * row_num + rowIndex) = value + } + rowIndex = rowIndex + 1 + } + + new DenseMatrix(row_num, column_num, values) + } + + private def computeSquaredDistances(points_matrix: DenseMatrix, + points_square_sums: DenseMatrix, + centers_matrix: DenseMatrix, + centers_square_sums: DenseMatrix): DenseMatrix = { + // (x - y)^2 = x^2 + y^2 - 2 * x * y + + // Add up squared sums of points and centers (x^2 + y^2) + val ret: DenseMatrix = computeMatrixSum(points_square_sums, centers_square_sums) + + // use GEMM to compute squared distances, (2*x*y) can be decomposed to matrix multiply + val alpha = -2.0 + val beta = 1.0 + BLAS.gemm(alpha, points_matrix, centers_matrix.transpose, beta, ret) + + ret + } + + private def computePointsSquareSum(points_matrix: DenseMatrix, + centers_num: Int): DenseMatrix = { + val points_num = points_matrix.numRows + val ret = DenseMatrix.zeros(points_num, centers_num) + for ((row, index) <- points_matrix.rowIter.zipWithIndex) { Review comment: I can access to raw value array of matrix but I didn't find a way in Java of using subarray without copying and use BLAS.axpy. Adding values individually may be faster? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
