Repository: flink Updated Branches: refs/heads/master 88576a0ee -> 1753b1d25
[FLINK-5426] [ml] Clean up the Flink Machine Learning library Removed duplicate tests, inproved scaladoc and naming, removed typo's in scaladoc, introduced and improved use of constants, improved test-case naming. This closes #3081. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1753b1d2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1753b1d2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1753b1d2 Branch: refs/heads/master Commit: 1753b1d25b4d943c1ffbe7c3fb40e56b840c4f7d Parents: 88576a0 Author: Fokko Driesprong <fokkodriespr...@godatadriven.com> Authored: Fri Jan 6 21:34:53 2017 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Mon Jan 16 13:16:45 2017 +0100 ---------------------------------------------------------------------- .../apache/flink/ml/classification/SVM.scala | 28 ++++++------ .../org/apache/flink/ml/common/Block.scala | 2 +- .../apache/flink/ml/common/FlinkMLTools.scala | 29 ++++++------ .../scala/org/apache/flink/ml/math/BLAS.scala | 14 +++--- .../scala/org/apache/flink/ml/math/Breeze.scala | 2 +- .../flink/ml/math/BreezeVectorConverter.scala | 2 +- .../org/apache/flink/ml/math/DenseMatrix.scala | 30 ++++++------ .../org/apache/flink/ml/math/DenseVector.scala | 17 ++++--- .../scala/org/apache/flink/ml/math/Matrix.scala | 4 +- .../org/apache/flink/ml/math/SparseMatrix.scala | 26 +++++------ .../org/apache/flink/ml/math/SparseVector.scala | 48 ++++++++++---------- .../scala/org/apache/flink/ml/math/Vector.scala | 13 +++--- .../math/distributed/DistributedRowMatrix.scala | 13 +++++- .../org/apache/flink/ml/math/package.scala | 8 ++-- .../main/scala/org/apache/flink/ml/nn/KNN.scala | 2 +- .../ml/optimization/PartialLossFunction.scala | 2 +- .../apache/flink/ml/optimization/Solver.scala | 2 +- .../apache/flink/ml/math/DenseVectorSuite.scala | 38 +++------------- .../PolynomialFeaturesITSuite.scala | 40 ++++++++-------- .../preprocessing/StandardScalerITSuite.scala | 6 +-- .../flink/ml/recommendation/ALSITSuite.scala | 11 ++--- .../MultipleLinearRegressionITSuite.scala | 2 +- 22 files changed, 162 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala index c9544f9..721dd69 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala @@ -139,7 +139,7 @@ class SVM extends Predictor[SVM] { /** Sets the number of data blocks/partitions * - * @param blocks + * @param blocks the number of blocks into which the input data will be split. * @return itself */ def setBlocks(blocks: Int): SVM = { @@ -149,7 +149,7 @@ class SVM extends Predictor[SVM] { /** Sets the number of outer iterations * - * @param iterations + * @param iterations the maximum number of iterations of the outer loop method * @return itself */ def setIterations(iterations: Int): SVM = { @@ -159,8 +159,8 @@ class SVM extends Predictor[SVM] { /** Sets the number of local SDCA iterations * - * @param localIterations - * @return itselft + * @param localIterations the maximum number of SDCA iterations + * @return itself */ def setLocalIterations(localIterations: Int): SVM = { parameters.add(LocalIterations, localIterations) @@ -169,7 +169,7 @@ class SVM extends Predictor[SVM] { /** Sets the regularization constant * - * @param regularization + * @param regularization the regularization constant of the SVM algorithm * @return itself */ def setRegularization(regularization: Double): SVM = { @@ -179,7 +179,7 @@ class SVM extends Predictor[SVM] { /** Sets the stepsize for the weight vector updates * - * @param stepsize + * @param stepsize the initial step size for the updates of the weight vector * @return itself */ def setStepsize(stepsize: Double): SVM = { @@ -189,7 +189,7 @@ class SVM extends Predictor[SVM] { /** Sets the seed value for the random number generator * - * @param seed + * @param seed the seed to initialize the random number generator * @return itself */ def setSeed(seed: Long): SVM = { @@ -201,8 +201,9 @@ class SVM extends Predictor[SVM] { * * The [[predict ]] and [[evaluate]] functions will return +1.0 for items with a decision * function value above this threshold, and -1.0 for items below it. - * @param threshold - * @return + * @param threshold the limiting value for the decision function above which examples are + * labeled as positive + * @return itself */ def setThreshold(threshold: Double): SVM = { parameters.add(ThresholdValue, threshold) @@ -219,6 +220,7 @@ class SVM extends Predictor[SVM] { * @param outputDecisionFunction When set to true, [[predict ]] and [[evaluate]] return the raw * decision function values. When set to false, they return the * thresholded binary values (+1.0, -1.0). + * @return itself */ def setOutputDecisionFunction(outputDecisionFunction: Boolean): SVM = { parameters.add(OutputDecisionFunction, outputDecisionFunction) @@ -231,7 +233,7 @@ class SVM extends Predictor[SVM] { */ object SVM{ - val WEIGHT_VECTOR ="weightVector" + val WEIGHT_VECTOR_BROADCAST_NAME = "weightVector" // ========================================== Parameters ========================================= @@ -277,7 +279,7 @@ object SVM{ /** Provides the operation that makes the predictions for individual examples. * - * @tparam T + * @tparam T Input data type which is a subtype of [[Vector]] * @return A PredictOperation, through which it is possible to predict a value, given a * feature vector */ @@ -432,7 +434,7 @@ object SVM{ var r: Random = _ override def open(parameters: Configuration): Unit = { - originalW = getRuntimeContext.getBroadcastVariable(WEIGHT_VECTOR).get(0) + originalW = getRuntimeContext.getBroadcastVariable(WEIGHT_VECTOR_BROADCAST_NAME).get(0) if(r == null){ r = new Random(seed ^ getRuntimeContext.getIndexOfThisSubtask) @@ -497,7 +499,7 @@ object SVM{ } } - blockedInputNumberElements.map(localSDCA).withBroadcastSet(w, WEIGHT_VECTOR) + blockedInputNumberElements.map(localSDCA).withBroadcastSet(w, WEIGHT_VECTOR_BROADCAST_NAME) } /** Maximizes the dual problem using hinge loss functions. It returns the alpha and weight http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/Block.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/Block.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/Block.scala index 1af77ea..9c55534 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/Block.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/Block.scala @@ -26,4 +26,4 @@ package org.apache.flink.ml.common * @param values * @tparam T */ -case class Block[T](index: Int, values: Vector[T]) {} +case class Block[T](index: Int, values: Vector[T]) http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala index 553ec00..bfc72a4 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala @@ -28,7 +28,7 @@ import org.apache.flink.core.fs.Path import scala.reflect.ClassTag -/** FlinkTools contains a set of convenience functions for Flink's machine learning library: +/** FlinkMLTools contains a set of convenience functions for Flink's machine learning library: * * - persist: * Takes up to 5 [[DataSet]]s and file paths. Each [[DataSet]] is written to the specified @@ -41,10 +41,11 @@ import scala.reflect.ClassTag * */ object FlinkMLTools { + val EXECUTION_ENVIRONMENT_NAME = "FlinkMLTools persist" /** Registers the different FlinkML related types for Kryo serialization * - * @param env + * @param env The Flink execution environment where the types need to be registered */ def registerFlinkMLTypes(env: ExecutionEnvironment): Unit = { @@ -91,7 +92,7 @@ object FlinkMLTools { outputFormat.setWriteMode(WriteMode.OVERWRITE) dataset.output(outputFormat) - env.execute("FlinkTools persist") + env.execute(EXECUTION_ENVIRONMENT_NAME) val inputFormat = new TypeSerializerInputFormat[T](dataset.getType) inputFormat.setFilePath(filePath) @@ -111,7 +112,7 @@ object FlinkMLTools { * @return Tuple of [[DataSet]]s reading the just written files */ def persist[A: ClassTag: TypeInformation ,B: ClassTag: TypeInformation](ds1: DataSet[A], ds2: - DataSet[B], path1: String, path2: String):(DataSet[A], DataSet[B]) = { + DataSet[B], path1: String, path2: String): (DataSet[A], DataSet[B]) = { val env = ds1.getExecutionEnvironment val f1 = new Path(path1) @@ -130,7 +131,7 @@ object FlinkMLTools { ds2.output(of2) - env.execute("FlinkTools persist") + env.execute(EXECUTION_ENVIRONMENT_NAME) val if1 = new TypeSerializerInputFormat[A](ds1.getType) if1.setFilePath(f1) @@ -184,7 +185,7 @@ object FlinkMLTools { ds3.output(of3) - env.execute("FlinkTools persist") + env.execute(EXECUTION_ENVIRONMENT_NAME) val if1 = new TypeSerializerInputFormat[A](ds1.getType) if1.setFilePath(f1) @@ -255,7 +256,7 @@ object FlinkMLTools { ds4.output(of4) - env.execute("FlinkTools persist") + env.execute(EXECUTION_ENVIRONMENT_NAME) val if1 = new TypeSerializerInputFormat[A](ds1.getType) if1.setFilePath(f1) @@ -340,7 +341,7 @@ object FlinkMLTools { ds5.output(of5) - env.execute("FlinkTools persist") + env.execute(EXECUTION_ENVIRONMENT_NAME) val if1 = new TypeSerializerInputFormat[A](ds1.getType) if1.setFilePath(f1) @@ -363,11 +364,11 @@ object FlinkMLTools { /** Groups the DataSet input into numBlocks blocks. * - * @param input + * @param input the input dataset * @param numBlocks Number of Blocks * @param partitionerOption Optional partitioner to control the partitioning - * @tparam T - * @return + * @tparam T Type of the [[DataSet]]'s elements + * @return The different datasets grouped into blocks */ def block[T: TypeInformation: ClassTag]( input: DataSet[T], @@ -395,10 +396,10 @@ object FlinkMLTools { } preGroupBlockIDInput.groupBy(0).reduceGroup { - iter => { - val array = iter.toVector + iterator => { + val array = iterator.toVector - val blockID = array(0)._1 + val blockID = array.head._1 val elements = array.map(_._2) Block[T](blockID, elements) http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BLAS.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BLAS.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BLAS.scala index 8ea3b65..d134623 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BLAS.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BLAS.scala @@ -77,7 +77,7 @@ object BLAS extends Serializable { val xValues = x.data val xIndices = x.indices val yValues = y.data - val nnz = xIndices.size + val nnz = xIndices.length if (a == 1.0) { var k = 0 @@ -130,7 +130,7 @@ object BLAS extends Serializable { val xValues = x.data val xIndices = x.indices val yValues = y.data - val nnz = xIndices.size + val nnz = xIndices.length var sum = 0.0 var k = 0 @@ -149,8 +149,8 @@ object BLAS extends Serializable { val xIndices = x.indices val yValues = y.data val yIndices = y.indices - val nnzx = xIndices.size - val nnzy = yIndices.size + val nnzx = xIndices.length + val nnzy = yIndices.length var kx = 0 var ky = 0 @@ -183,7 +183,7 @@ object BLAS extends Serializable { val sxIndices = sx.indices val sxValues = sx.data val dyValues = dy.data - val nnz = sxIndices.size + val nnz = sxIndices.length var i = 0 var k = 0 @@ -215,9 +215,9 @@ object BLAS extends Serializable { def scal(a: Double, x: Vector): Unit = { x match { case sx: SparseVector => - f2jBLAS.dscal(sx.data.size, a, sx.data, 1) + f2jBLAS.dscal(sx.data.length, a, sx.data, 1) case dx: DenseVector => - f2jBLAS.dscal(dx.data.size, a, dx.data, 1) + f2jBLAS.dscal(dx.data.length, a, dx.data, 1) case _ => throw new IllegalArgumentException(s"scal doesn't support vector type ${x.getClass}.") } http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala index 74d4d8f..5058ec3 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala @@ -18,7 +18,7 @@ package org.apache.flink.ml.math -import breeze.linalg.{ Matrix => BreezeMatrix, DenseMatrix => BreezeDenseMatrix, +import breeze.linalg.{Matrix => BreezeMatrix, DenseMatrix => BreezeDenseMatrix, CSCMatrix => BreezeCSCMatrix, DenseVector => BreezeDenseVector, SparseVector => BreezeSparseVector, Vector => BreezeVector} http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala index 0bb24f3..dac8fb2 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala @@ -22,7 +22,7 @@ import breeze.linalg.{Vector => BreezeVector} /** Type class which allows the conversion from Breeze vectors to Flink vectors * - * @tparam T Resulting type of the conversion + * @tparam T Resulting type of the conversion, subtype of [[Vector]] */ trait BreezeVectorConverter[T <: Vector] extends Serializable { /** Converts a Breeze vector into a Flink vector of type T http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseMatrix.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseMatrix.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseMatrix.scala index 4ae565e..80d1dc3 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseMatrix.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseMatrix.scala @@ -26,12 +26,8 @@ package org.apache.flink.ml.math * @param numCols Number of columns * @param data Array of matrix elements in column major order */ -case class DenseMatrix( - val numRows: Int, - val numCols: Int, - val data: Array[Double]) - extends Matrix - with Serializable{ +case class DenseMatrix(numRows: Int, numCols: Int, data: Array[Double]) + extends Matrix with Serializable { import DenseMatrix._ @@ -55,20 +51,18 @@ case class DenseMatrix( val result = StringBuilder.newBuilder result.append(s"DenseMatrix($numRows, $numCols)\n") - val linewidth = LINE_WIDTH - val columnsFieldWidths = for(row <- 0 until math.min(numRows, MAX_ROWS)) yield { var column = 0 var maxFieldWidth = 0 - while(column * maxFieldWidth < linewidth && column < numCols) { + while(column * maxFieldWidth < LINE_WIDTH && column < numCols) { val fieldWidth = printEntry(row, column).length + 2 if(fieldWidth > maxFieldWidth) { maxFieldWidth = fieldWidth } - if(column * maxFieldWidth < linewidth) { + if(column * maxFieldWidth < LINE_WIDTH) { column += 1 } } @@ -128,6 +122,10 @@ case class DenseMatrix( data(index) = value } + /** Converts the DenseMatrix to a SparseMatrix + * + * @return SparseMatrix build from all the non-null values + */ def toSparseMatrix: SparseMatrix = { val entries = for(row <- 0 until numRows; col <- 0 until numCols) yield { (row, col, apply(row, col)) @@ -138,9 +136,9 @@ case class DenseMatrix( /** Calculates the linear index of the respective matrix entry * - * @param row - * @param col - * @return + * @param row row index + * @param col column index + * @return the index of the value according to the row and index */ private def locate(row: Int, col: Int): Int = { require(0 <= row && row < numRows && 0 <= col && col < numCols, @@ -151,9 +149,9 @@ case class DenseMatrix( /** Converts the entry at (row, col) to string * - * @param row - * @param col - * @return + * @param row row index + * @param col column index + * @return Takes the value according to the row and index and convert it to string */ private def printEntry(row: Int, col: Int): String = { val index = locate(row, col) http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala index 5e70741..c0c37bb 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala @@ -26,14 +26,11 @@ import breeze.linalg.{SparseVector => BreezeSparseVector, DenseVector => BreezeD * * @param data Array of doubles to store the vector elements */ -case class DenseVector( - data: Array[Double]) - extends Vector - with Serializable { +case class DenseVector(data: Array[Double]) extends Vector with Serializable { /** * Number of elements in a vector - * @return + * @return the number of the elements in the vector */ override def size: Int = { data.length @@ -136,10 +133,16 @@ case class DenseVector( /** Magnitude of a vector * - * @return + * @return The length of the vector */ - override def magnitude: Double = math.sqrt(data.map(x => x * x).sum) + override def magnitude: Double = { + math.sqrt(data.map(x => x * x).sum) + } + /** Convert to a [[SparseVector]] + * + * @return Creates a SparseVector from the DenseVector + */ def toSparseVector: SparseVector = { val nonZero = (0 until size).zip(data).filter(_._2 != 0) http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Matrix.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Matrix.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Matrix.scala index ba6a781..ea2b4cd 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Matrix.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Matrix.scala @@ -25,13 +25,13 @@ trait Matrix { /** Number of rows * - * @return + * @return number of rows in the matrix */ def numRows: Int /** Number of columns * - * @return + * @return number of columns in the matrix */ def numCols: Int http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseMatrix.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseMatrix.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseMatrix.scala index fe58ddb..d7dd9dc 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseMatrix.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseMatrix.scala @@ -47,7 +47,6 @@ class SparseMatrix( * @return matrix entry at (row, col) */ override def apply(row: Int, col: Int): Double = { - val index = locate(row, col) if(index < 0){ @@ -152,10 +151,11 @@ object SparseMatrix{ /** Constructs a sparse matrix from a coordinate list (COO) representation where each entry * is stored as a tuple of (rowIndex, columnIndex, value). - * @param numRows - * @param numCols - * @param entries - * @return + * + * @param numRows Number of rows + * @param numCols Number of columns + * @param entries Data entries in the matrix + * @return Newly constructed sparse matrix */ def fromCOO(numRows: Int, numCols: Int, entries: (Int, Int, Double)*): SparseMatrix = { fromCOO(numRows, numCols, entries) @@ -164,10 +164,10 @@ object SparseMatrix{ /** Constructs a sparse matrix from a coordinate list (COO) representation where each entry * is stored as a tuple of (rowIndex, columnIndex, value). * - * @param numRows - * @param numCols - * @param entries - * @return + * @param numRows Number of rows + * @param numCols Number of columns + * @param entries Data entries in the matrix + * @return Newly constructed sparse matrix */ def fromCOO(numRows: Int, numCols: Int, entries: Iterable[(Int, Int, Double)]): SparseMatrix = { val entryArray = entries.toArray @@ -256,10 +256,10 @@ object SparseMatrix{ * cannot infer that the tuple has to be of type (Int, Int, Double) because of the overloading * with the Iterable type. * - * @param numRows - * @param numCols - * @param entry - * @return + * @param numRows Number of rows + * @param numCols Number of columns + * @param entry Data entries in the matrix + * @return Newly constructed sparse matrix */ def fromCOO(numRows: Int, numCols: Int, entry: (Int, Int, Int)): SparseMatrix = { fromCOO(numRows, numCols, (entry._1, entry._2, entry._3.toDouble)) http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala index fec018f..29c16cc 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala @@ -25,12 +25,8 @@ import scala.util.Sorting /** Sparse vector implementation storing the data in two arrays. One index contains the sorted * indices of the non-zero vector entries and the other the corresponding vector entries */ -case class SparseVector( - size: Int, - indices: Array[Int], - data: Array[Double]) - extends Vector - with Serializable { +case class SparseVector(size: Int, indices: Array[Int], data: Array[Double]) + extends Vector with Serializable { /** Updates the element at the given index with the provided value * @@ -41,8 +37,8 @@ case class SparseVector( val resolvedIndex = locate(index) if (resolvedIndex < 0) { - throw new IllegalArgumentException("Cannot update zero value of sparse vector at index " + - index) + throw new IllegalArgumentException("Cannot update zero value of sparse vector at " + + s"index $index") } else { data(resolvedIndex) = value } @@ -50,7 +46,7 @@ case class SparseVector( /** Copies the vector instance * - * @return Copy of the vector instance + * @return Copy of the [[SparseVector]] instance */ override def copy: SparseVector = { new SparseVector(size, indices.clone, data.clone) @@ -86,13 +82,11 @@ case class SparseVector( } } - /** Returns the outer product (a.k.a. Kronecker product) of `this` - * with `other`. The result is given in [[org.apache.flink.ml.math.SparseMatrix]] - * representation. + /** Returns the outer product (a.k.a. Kronecker product) of `this` with `other`. The result is + * given in [[SparseMatrix]] representation. * - * @param other a Vector - * @return the [[org.apache.flink.ml.math.SparseMatrix]] which equals the outer product of `this` - * with `other.` + * @param other a [[Vector]] + * @return the [[SparseMatrix]] which equals the outer product of `this` with `other.` */ override def outer(other: Vector): SparseMatrix = { val numRows = size @@ -121,7 +115,7 @@ case class SparseVector( /** Magnitude of a vector * - * @return + * @return The length of the vector */ override def magnitude: Double = math.sqrt(data.map(x => x * x).sum) @@ -140,6 +134,10 @@ case class SparseVector( } } + /** Converts the [[SparseVector]] to a [[DenseVector]] + * + * @return The DenseVector out of the SparseVector + */ def toDenseVector: DenseVector = { val denseVector = DenseVector.zeros(size) @@ -182,9 +180,9 @@ object SparseVector { /** Constructs a sparse vector from a coordinate list (COO) representation where each entry * is stored as a tuple of (index, value). * - * @param size - * @param entries - * @return + * @param size The number of elements in the vector + * @param entries The values in the vector + * @return a new [[SparseVector]] */ def fromCOO(size: Int, entries: (Int, Double)*): SparseVector = { fromCOO(size, entries) @@ -193,9 +191,9 @@ object SparseVector { /** Constructs a sparse vector from a coordinate list (COO) representation where each entry * is stored as a tuple of (index, value). * - * @param size - * @param entries - * @return + * @param size The number of elements in the vector + * @param entries An iterator supplying the values in the vector + * @return a new [[SparseVector]] */ def fromCOO(size: Int, entries: Iterable[(Int, Double)]): SparseVector = { val entryArray = entries.toArray @@ -255,9 +253,9 @@ object SparseVector { * type inference mechanism cannot infer that the second tuple value has to be of type Double * if only a single tuple is provided. * - * @param size - * @param entry - * @return + * @param size The number of elements in the vector + * @param entry The value in the vector + * @return a new [[SparseVector]] */ def fromCOO(size: Int, entry: (Int, Int)): SparseVector = { fromCOO(size, (entry._1, entry._2.toDouble)) http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala index e52328d..0c911d0 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala @@ -27,21 +27,21 @@ trait Vector extends Serializable { /** Number of elements in a vector * - * @return + * @return The number of elements of the vector */ def size: Int /** Element wise access function * - * * @param index index of the accessed element - * @return element with index + * @param index index of the accessed element + * @return value of the associated with the index */ def apply(index: Int): Double /** Updates the element at the given index with the provided value * - * @param index - * @param value + * @param index The index of the element to be updated + * @param value The new value */ def update(index: Int, value: Double): Unit @@ -60,7 +60,6 @@ trait Vector extends Serializable { /** Returns the outer product of the recipient and the argument * - * * @param other a Vector * @return a matrix */ @@ -68,7 +67,7 @@ trait Vector extends Serializable { /** Magnitude of a vector * - * @return + * @return The length of the vector */ def magnitude: Double http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala index 9092e5c..5dca01a 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala @@ -19,9 +19,9 @@ package org.apache.flink.ml.math.distributed import org.apache.flink.api.scala._ +import org.apache.flink.ml.math._ import org.apache.flink.ml.math.Breeze._ import org.apache.flink.ml.math.distributed.DistributedMatrix._ -import org.apache.flink.ml.math._ /** Represents distributed row-major matrix. * @@ -37,6 +37,8 @@ class DistributedRowMatrix( /** Collects the data in the form of a sequence of coordinates associated with their values. * This operation immediately triggers program execution. + * + * @return Returns the matrix in the sparse coordinate format */ def toCOO: Seq[(MatrixRowIndex, MatrixColIndex, Double)] = { val localRows = data.collect() @@ -49,6 +51,8 @@ class DistributedRowMatrix( /** Collects the data in the form of a SparseMatrix. This operation immediately triggers program * execution. + * + * @return Returns the matrix as a local [[SparseMatrix]] */ def toLocalSparseMatrix: SparseMatrix = { val localMatrix = SparseMatrix.fromCOO(this.numRows, this.numCols, this.toCOO) @@ -61,6 +65,8 @@ class DistributedRowMatrix( // TODO: convert to dense representation on the distributed matrix and collect it afterward /** Collects the data in the form of a DenseMatrix. This operation immediately triggers program * execution. + * + * @return Returns the matrix as a [[DenseMatrix]] */ def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix @@ -68,6 +74,7 @@ class DistributedRowMatrix( * * @param func a function to be applied * @param other a [[DistributedRowMatrix]] to apply the function together + * @return Applies the function and returns a new [[DistributedRowMatrix]] */ def byRowOperation( func: (Vector, Vector) => Vector, @@ -101,6 +108,7 @@ class DistributedRowMatrix( /** Adds this matrix to another matrix. * * @param other a [[DistributedRowMatrix]] to be added + * @return [[DistributedRowMatrix]] representing the two matrices added. */ def add(other: DistributedRowMatrix): DistributedRowMatrix = { val addFunction = (x: Vector, y: Vector) => (x.asBreeze + y.asBreeze).fromBreeze @@ -110,6 +118,8 @@ class DistributedRowMatrix( /** Subtracts another matrix from this matrix. * * @param other a [[DistributedRowMatrix]] to be subtracted from this matrix + * @return [[DistributedRowMatrix]] representing the original matrix subtracted by the supplied + * matrix. */ def subtract(other: DistributedRowMatrix): DistributedRowMatrix = { val subFunction = (x: Vector, y: Vector) => (x.asBreeze - y.asBreeze).fromBreeze @@ -127,6 +137,7 @@ object DistributedRowMatrix { * @param numCols Number of columns * @param isSorted If false, sorts the row to properly build the matrix representation. * If already sorted, set this parameter to true to skip sorting. + * @return the [[DistributedRowMatrix]] build from the original coordinate matrix */ def fromCOO(data: DataSet[(MatrixRowIndex, MatrixColIndex, Double)], numRows: Int, http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala index 4c7f254..7493602 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala @@ -89,22 +89,22 @@ package object math { /** Stores the vector values in a dense array * - * @param vector + * @param vector Subtype of [[Vector]] * @return Array containing the vector values */ def vector2Array(vector: Vector): Array[Double] = { vector match { case dense: DenseVector => dense.data.clone - case sparse: SparseVector => + case sparse: SparseVector => { val result = new Array[Double](sparse.size) - for((index, value) <- sparse) { + for ((index, value) <- sparse) { result(index) = value } result - + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala index d15fdaf..527e636 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala @@ -131,7 +131,7 @@ class KNN extends Predictor[KNN] { /** Parameter a user can specify if one of the training or test sets are small * - * @param sizeHint + * @param sizeHint cross hint tells the system which sizes to expect from the data sets */ def setSizeHint(sizeHint: CrossHint): KNN = { parameters.add(SizeHint, sizeHint) http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala index 5cf69b6..ac0053e 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala @@ -62,6 +62,6 @@ object SquaredLoss extends PartialLossFunction { * @return */ override def derivative(prediction: Double, label: Double): Double = { - (prediction - label) + prediction - label } } http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala index 3bad038..ee91bd1 100644 --- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala +++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala @@ -80,7 +80,7 @@ abstract class Solver extends Serializable with WithParameters { dimensionDS.map { dimension => val values = Array.fill(dimension)(0.0) - new WeightVector(DenseVector(values), 0.0) + WeightVector(DenseVector(values), 0.0) } } http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala index aa1c4f9..bb5fd45 100644 --- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala @@ -110,34 +110,28 @@ class DenseVectorSuite extends FlatSpec with Matchers { vec1.outer(vec2) should be(SparseMatrix.fromCOO(5, 5, entries)) } - - - it should s"""calculate right outer product with DenseVector - |with one-dimensional unit DenseVector as identity""".stripMargin in { + it should "DenseVector right outer product with one-dimensional DenseVector as identity" in { val vec = DenseVector(Array(1, 0, 1, 0, 0)) val unit = DenseVector(1) vec.outer(unit) should equal(DenseMatrix(vec.size, 1, vec.data)) } - it should s"""calculate right outer product with DenseVector - |with one-dimensional unit SparseVector as identity""".stripMargin in { + it should "DenseVector right outer product with one-dimensional SparseVector as identity" in { val vec = DenseVector(Array(1, 0, 1, 0, 0)) val unit = SparseVector(1, Array(0), Array(1)) vec.outer(unit) should equal(SparseMatrix.fromCOO(vec.size, 1, (0, 0, 1), (2, 0, 1))) } - it should s"""calculate left outer product for DenseVector - |with one-dimensional unit DenseVector as identity""".stripMargin in { + it should "DenseVector left outer product with one-dimensional unit DenseVector as identity" in { val vec = DenseVector(Array(1, 2, 3, 4, 5)) val unit = DenseVector(1) unit.outer(vec) should equal(DenseMatrix(1, vec.size, vec.data)) } - it should s"""calculate left outer product for SparseVector - |with one-dimensional unit DenseVector as identity""".stripMargin in { + it should "SparseVector left outer product with one-dimensional unit DenseVector as identity" in { val vec = SparseVector(5, Array(0, 1, 2, 3, 4), Array(1, 2, 3, 4, 5)) val unit = DenseVector(1) @@ -145,32 +139,14 @@ class DenseVectorSuite extends FlatSpec with Matchers { unit.outer(vec) should equal(SparseMatrix.fromCOO(1, vec.size, entries)) } - it should s"""calculate outer product with DenseVector - |via multiplication if both vectors are one-dimensional""".stripMargin in { - val vec1 = DenseVector(Array(2)) - val vec2 = DenseVector(Array(3)) - - vec1.outer(vec2) should be(DenseMatrix(1, 1, 2 * 3)) - } - - it should s"""calculate outer product with SparseVector - |via multiplication if both vectors are one-dimensional""".stripMargin in { - val vec1 = DenseVector(Array(2)) - val vec2 = SparseVector(1, Array(0), Array(3)) - - vec1.outer(vec2) should be(SparseMatrix.fromCOO(1, 1, (0, 0, 2 * 3))) - } - - it should "calculate outer product with DenseVector via multiplication if both vectors " + - "are one-dimensional" in { + it should "DenseVector outer product via multiplication if both vectors are one-dimensional" in { val vec1 = DenseVector(Array(2)) val vec2 = DenseVector(Array(3)) vec1.outer(vec2) should be(DenseMatrix(1, 1, 2 * 3)) } - it should "calculate outer product with SparseVector via multiplication if both vectors are " + - "one-dimensioan" in { + it should "SparseVector outer product via multiplication if both vectors are one-dimensional" in { val vec1 = DenseVector(Array(2)) val vec2 = SparseVector(1, Array(0), Array(3)) @@ -180,7 +156,7 @@ class DenseVectorSuite extends FlatSpec with Matchers { it should "calculate magnitude of vector" in { val vec = DenseVector(Array(1, 4, 8)) - vec.magnitude should be(9) + vec.magnitude should be(Math.sqrt((1 * 1) + (4 * 4) + (8 * 8))) } it should "convert from and to Breeze vector" in { http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala index 4dc7cf3..1b272e8 100644 --- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala @@ -26,38 +26,38 @@ import org.scalatest.{FlatSpec, Matchers} class PolynomialFeaturesITSuite extends FlatSpec - with Matchers - with FlinkTestBase { + with Matchers + with FlinkTestBase { behavior of "The polynomial base implementation" it should "map single element vectors to the polynomial vector space" in { val env = ExecutionEnvironment.getExecutionEnvironment - env.setParallelism (2) + env.setParallelism(2) - val input = Seq ( - LabeledVector (1.0, DenseVector (1)), - LabeledVector (2.0, DenseVector (2)) + val input = Seq( + LabeledVector(1.0, DenseVector(1)), + LabeledVector(2.0, DenseVector(2)) ) - val inputDS = env.fromCollection (input) + val inputDS = env.fromCollection(input) val transformer = PolynomialFeatures() - .setDegree (3) + .setDegree(3) val transformedDS = transformer.transform(inputDS) - val expectedMap = List ( - (1.0 -> DenseVector (1.0, 1.0, 1.0) ), - (2.0 -> DenseVector (8.0, 4.0, 2.0) ) - ) toMap + val expectedMap = Map( + 1.0 -> DenseVector(1.0, 1.0, 1.0), + 2.0 -> DenseVector(8.0, 4.0, 2.0) + ) val result = transformedDS.collect() for (entry <- result) { - expectedMap.contains (entry.label) should be (true) - entry.vector should equal (expectedMap (entry.label) ) + expectedMap.contains(entry.label) should be(true) + entry.vector should equal(expectedMap(entry.label)) } } @@ -86,7 +86,7 @@ class PolynomialFeaturesITSuite val result = transformedDS.collect() - for(entry <- result) { + for (entry <- result) { expectedMap.contains(entry.label) should be(true) entry.vector should equal(expectedMap(entry.label)) } @@ -111,12 +111,12 @@ class PolynomialFeaturesITSuite val result = transformedDS.collect() - val expectedMap = List( - (1.0 -> DenseVector()), - (2.0 -> DenseVector()) - ) toMap + val expectedMap = Map( + 1.0 -> DenseVector(), + 2.0 -> DenseVector() + ) - for(entry <- result) { + for (entry <- result) { expectedMap.contains(entry.label) should be(true) entry.vector should equal(expectedMap(entry.label)) } http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala index 767de1d..19aef68 100644 --- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala @@ -43,7 +43,7 @@ class StandardScalerITSuite expectedStd: Double): Unit = { scaledVectors.length should equal(data.length) - val numberOfFeatures = scaledVectors(0).size + val numberOfFeatures = scaledVectors.head.size var scaledMean: linalg.Vector[Double] = linalg.DenseVector.zeros(numberOfFeatures) var scaledStd: linalg.Vector[Double] = linalg.DenseVector.zeros(numberOfFeatures) @@ -95,7 +95,7 @@ class StandardScalerITSuite val dataSet = env.fromCollection(data).map(v => LabeledVector(1.0, v)) val scaler = StandardScaler() scaler.fit(dataSet) - val scaledVectors = scaler.transform(dataSet).map(lv => lv.vector).collect() + val scaledVectors = scaler.transform(dataSet).map(_.vector).collect() checkVectors(scaledVectors, 0.0, 1.0) } @@ -106,7 +106,7 @@ class StandardScalerITSuite val dataSet = env.fromCollection(data).map(v => (v, 1.0)) val scaler = StandardScaler() scaler.fit(dataSet) - val scaledVectors = scaler.transform(dataSet).map(vl => vl._1).collect() + val scaledVectors = scaler.transform(dataSet).map(_._1).collect() checkVectors(scaledVectors, 0.0, 1.0) } http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala index 8dca0c3..043d8cb 100644 --- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala @@ -25,10 +25,7 @@ import scala.language.postfixOps import org.apache.flink.api.scala._ -class ALSITSuite - extends FlatSpec - with Matchers - with FlinkTestBase { +class ALSITSuite extends FlatSpec with Matchers with FlinkTestBase { override val parallelism = 2 @@ -49,7 +46,7 @@ class ALSITSuite als.fit(inputDS) - val testData = env.fromCollection(expectedResult.map{ + val testData = env.fromCollection(expectedResult.map { case (userID, itemID, rating) => (userID, itemID) }) @@ -57,9 +54,9 @@ class ALSITSuite predictions.length should equal(expectedResult.length) - val resultMap = expectedResult map { + val resultMap = expectedResult.map { case (uID, iID, value) => (uID, iID) -> value - } toMap + }.toMap predictions foreach { case (uID, iID, value) => { http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala index fc3df3a..72e84f2 100644 --- a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala +++ b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala @@ -29,7 +29,7 @@ class MultipleLinearRegressionITSuite with Matchers with FlinkTestBase { - behavior of "The multipe linear regression implementation" + behavior of "The multiple linear regression implementation" it should "estimate a linear function" in { val env = ExecutionEnvironment.getExecutionEnvironment