Repository: flink
Updated Branches:
  refs/heads/master 88576a0ee -> 1753b1d25


[FLINK-5426] [ml] Clean up the Flink Machine Learning library

Removed duplicate tests, inproved scaladoc and naming, removed typo's in 
scaladoc, introduced and improved use of constants, improved test-case naming.

This closes #3081.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1753b1d2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1753b1d2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1753b1d2

Branch: refs/heads/master
Commit: 1753b1d25b4d943c1ffbe7c3fb40e56b840c4f7d
Parents: 88576a0
Author: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Authored: Fri Jan 6 21:34:53 2017 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Jan 16 13:16:45 2017 +0100

----------------------------------------------------------------------
 .../apache/flink/ml/classification/SVM.scala    | 28 ++++++------
 .../org/apache/flink/ml/common/Block.scala      |  2 +-
 .../apache/flink/ml/common/FlinkMLTools.scala   | 29 ++++++------
 .../scala/org/apache/flink/ml/math/BLAS.scala   | 14 +++---
 .../scala/org/apache/flink/ml/math/Breeze.scala |  2 +-
 .../flink/ml/math/BreezeVectorConverter.scala   |  2 +-
 .../org/apache/flink/ml/math/DenseMatrix.scala  | 30 ++++++------
 .../org/apache/flink/ml/math/DenseVector.scala  | 17 ++++---
 .../scala/org/apache/flink/ml/math/Matrix.scala |  4 +-
 .../org/apache/flink/ml/math/SparseMatrix.scala | 26 +++++------
 .../org/apache/flink/ml/math/SparseVector.scala | 48 ++++++++++----------
 .../scala/org/apache/flink/ml/math/Vector.scala | 13 +++---
 .../math/distributed/DistributedRowMatrix.scala | 13 +++++-
 .../org/apache/flink/ml/math/package.scala      |  8 ++--
 .../main/scala/org/apache/flink/ml/nn/KNN.scala |  2 +-
 .../ml/optimization/PartialLossFunction.scala   |  2 +-
 .../apache/flink/ml/optimization/Solver.scala   |  2 +-
 .../apache/flink/ml/math/DenseVectorSuite.scala | 38 +++-------------
 .../PolynomialFeaturesITSuite.scala             | 40 ++++++++--------
 .../preprocessing/StandardScalerITSuite.scala   |  6 +--
 .../flink/ml/recommendation/ALSITSuite.scala    | 11 ++---
 .../MultipleLinearRegressionITSuite.scala       |  2 +-
 22 files changed, 162 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
index c9544f9..721dd69 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
@@ -139,7 +139,7 @@ class SVM extends Predictor[SVM] {
 
   /** Sets the number of data blocks/partitions
     *
-    * @param blocks
+    * @param blocks the number of blocks into which the input data will be 
split.
     * @return itself
     */
   def setBlocks(blocks: Int): SVM = {
@@ -149,7 +149,7 @@ class SVM extends Predictor[SVM] {
 
   /** Sets the number of outer iterations
     *
-    * @param iterations
+    * @param iterations the maximum number of iterations of the outer loop 
method
     * @return itself
     */
   def setIterations(iterations: Int): SVM = {
@@ -159,8 +159,8 @@ class SVM extends Predictor[SVM] {
 
   /** Sets the number of local SDCA iterations
     *
-    * @param localIterations
-    * @return itselft
+    * @param localIterations the maximum number of SDCA iterations
+    * @return itself
     */
   def setLocalIterations(localIterations: Int): SVM =  {
     parameters.add(LocalIterations, localIterations)
@@ -169,7 +169,7 @@ class SVM extends Predictor[SVM] {
 
   /** Sets the regularization constant
     *
-    * @param regularization
+    * @param regularization the regularization constant of the SVM algorithm
     * @return itself
     */
   def setRegularization(regularization: Double): SVM = {
@@ -179,7 +179,7 @@ class SVM extends Predictor[SVM] {
 
   /** Sets the stepsize for the weight vector updates
     *
-    * @param stepsize
+    * @param stepsize the initial step size for the updates of the weight 
vector
     * @return itself
     */
   def setStepsize(stepsize: Double): SVM = {
@@ -189,7 +189,7 @@ class SVM extends Predictor[SVM] {
 
   /** Sets the seed value for the random number generator
     *
-    * @param seed
+    * @param seed the seed to initialize the random number generator
     * @return itself
     */
   def setSeed(seed: Long): SVM = {
@@ -201,8 +201,9 @@ class SVM extends Predictor[SVM] {
     *
     * The [[predict ]] and [[evaluate]] functions will return +1.0 for items 
with a decision
     * function value above this threshold, and -1.0 for items below it.
-    * @param threshold
-    * @return
+    * @param threshold the limiting value for the decision function above 
which examples are
+    *                  labeled as positive
+    * @return itself
     */
   def setThreshold(threshold: Double): SVM = {
     parameters.add(ThresholdValue, threshold)
@@ -219,6 +220,7 @@ class SVM extends Predictor[SVM] {
     * @param outputDecisionFunction When set to true, [[predict ]] and 
[[evaluate]] return the raw
     *                               decision function values. When set to 
false, they return the
     *                               thresholded binary values (+1.0, -1.0).
+    * @return itself
     */
   def setOutputDecisionFunction(outputDecisionFunction: Boolean): SVM = {
     parameters.add(OutputDecisionFunction, outputDecisionFunction)
@@ -231,7 +233,7 @@ class SVM extends Predictor[SVM] {
   */
 object SVM{
 
-  val WEIGHT_VECTOR ="weightVector"
+  val WEIGHT_VECTOR_BROADCAST_NAME = "weightVector"
 
   // ========================================== Parameters 
=========================================
 
@@ -277,7 +279,7 @@ object SVM{
 
   /** Provides the operation that makes the predictions for individual 
examples.
     *
-    * @tparam T
+    * @tparam T Input data type which is a subtype of [[Vector]]
     * @return A PredictOperation, through which it is possible to predict a 
value, given a
     *         feature vector
     */
@@ -432,7 +434,7 @@ object SVM{
       var r: Random = _
 
       override def open(parameters: Configuration): Unit = {
-        originalW = 
getRuntimeContext.getBroadcastVariable(WEIGHT_VECTOR).get(0)
+        originalW = 
getRuntimeContext.getBroadcastVariable(WEIGHT_VECTOR_BROADCAST_NAME).get(0)
 
         if(r == null){
           r = new Random(seed ^ getRuntimeContext.getIndexOfThisSubtask)
@@ -497,7 +499,7 @@ object SVM{
       }
     }
 
-    blockedInputNumberElements.map(localSDCA).withBroadcastSet(w, 
WEIGHT_VECTOR)
+    blockedInputNumberElements.map(localSDCA).withBroadcastSet(w, 
WEIGHT_VECTOR_BROADCAST_NAME)
   }
 
   /** Maximizes the dual problem using hinge loss functions. It returns the 
alpha and weight

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/Block.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/Block.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/Block.scala
index 1af77ea..9c55534 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/Block.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/Block.scala
@@ -26,4 +26,4 @@ package org.apache.flink.ml.common
   * @param values
   * @tparam T
   */
-case class Block[T](index: Int, values: Vector[T]) {}
+case class Block[T](index: Int, values: Vector[T])

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala
index 553ec00..bfc72a4 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/common/FlinkMLTools.scala
@@ -28,7 +28,7 @@ import org.apache.flink.core.fs.Path
 
 import scala.reflect.ClassTag
 
-/** FlinkTools contains a set of convenience functions for Flink's machine 
learning library:
+/** FlinkMLTools contains a set of convenience functions for Flink's machine 
learning library:
   *
   *  - persist:
   *  Takes up to 5 [[DataSet]]s and file paths. Each [[DataSet]] is written to 
the specified
@@ -41,10 +41,11 @@ import scala.reflect.ClassTag
   *
   */
 object FlinkMLTools {
+  val EXECUTION_ENVIRONMENT_NAME = "FlinkMLTools persist"
 
   /** Registers the different FlinkML related types for Kryo serialization
     *
-    * @param env
+    * @param env The Flink execution environment where the types need to be 
registered
     */
   def registerFlinkMLTypes(env: ExecutionEnvironment): Unit = {
 
@@ -91,7 +92,7 @@ object FlinkMLTools {
     outputFormat.setWriteMode(WriteMode.OVERWRITE)
 
     dataset.output(outputFormat)
-    env.execute("FlinkTools persist")
+    env.execute(EXECUTION_ENVIRONMENT_NAME)
 
     val inputFormat = new TypeSerializerInputFormat[T](dataset.getType)
     inputFormat.setFilePath(filePath)
@@ -111,7 +112,7 @@ object FlinkMLTools {
     * @return Tuple of [[DataSet]]s reading the just written files
     */
   def persist[A: ClassTag: TypeInformation ,B: ClassTag: TypeInformation](ds1: 
DataSet[A], ds2:
-  DataSet[B], path1: String, path2: String):(DataSet[A], DataSet[B])  = {
+  DataSet[B], path1: String, path2: String): (DataSet[A], DataSet[B]) = {
     val env = ds1.getExecutionEnvironment
 
     val f1 = new Path(path1)
@@ -130,7 +131,7 @@ object FlinkMLTools {
 
     ds2.output(of2)
 
-    env.execute("FlinkTools persist")
+    env.execute(EXECUTION_ENVIRONMENT_NAME)
 
     val if1 = new TypeSerializerInputFormat[A](ds1.getType)
     if1.setFilePath(f1)
@@ -184,7 +185,7 @@ object FlinkMLTools {
 
     ds3.output(of3)
 
-    env.execute("FlinkTools persist")
+    env.execute(EXECUTION_ENVIRONMENT_NAME)
 
     val if1 = new TypeSerializerInputFormat[A](ds1.getType)
     if1.setFilePath(f1)
@@ -255,7 +256,7 @@ object FlinkMLTools {
 
     ds4.output(of4)
 
-    env.execute("FlinkTools persist")
+    env.execute(EXECUTION_ENVIRONMENT_NAME)
 
     val if1 = new TypeSerializerInputFormat[A](ds1.getType)
     if1.setFilePath(f1)
@@ -340,7 +341,7 @@ object FlinkMLTools {
 
     ds5.output(of5)
 
-    env.execute("FlinkTools persist")
+    env.execute(EXECUTION_ENVIRONMENT_NAME)
 
     val if1 = new TypeSerializerInputFormat[A](ds1.getType)
     if1.setFilePath(f1)
@@ -363,11 +364,11 @@ object FlinkMLTools {
 
   /** Groups the DataSet input into numBlocks blocks.
     * 
-    * @param input
+    * @param input the input dataset
     * @param numBlocks Number of Blocks
     * @param partitionerOption Optional partitioner to control the partitioning
-    * @tparam T
-    * @return
+    * @tparam T Type of the [[DataSet]]'s elements
+    * @return The different datasets grouped into blocks
     */
   def block[T: TypeInformation: ClassTag](
     input: DataSet[T],
@@ -395,10 +396,10 @@ object FlinkMLTools {
     }
 
     preGroupBlockIDInput.groupBy(0).reduceGroup {
-      iter => {
-        val array = iter.toVector
+      iterator => {
+        val array = iterator.toVector
 
-        val blockID = array(0)._1
+        val blockID = array.head._1
         val elements = array.map(_._2)
 
         Block[T](blockID, elements)

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BLAS.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BLAS.scala 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BLAS.scala
index 8ea3b65..d134623 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BLAS.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BLAS.scala
@@ -77,7 +77,7 @@ object BLAS extends Serializable {
     val xValues = x.data
     val xIndices = x.indices
     val yValues = y.data
-    val nnz = xIndices.size
+    val nnz = xIndices.length
 
     if (a == 1.0) {
       var k = 0
@@ -130,7 +130,7 @@ object BLAS extends Serializable {
     val xValues = x.data
     val xIndices = x.indices
     val yValues = y.data
-    val nnz = xIndices.size
+    val nnz = xIndices.length
 
     var sum = 0.0
     var k = 0
@@ -149,8 +149,8 @@ object BLAS extends Serializable {
     val xIndices = x.indices
     val yValues = y.data
     val yIndices = y.indices
-    val nnzx = xIndices.size
-    val nnzy = yIndices.size
+    val nnzx = xIndices.length
+    val nnzy = yIndices.length
 
     var kx = 0
     var ky = 0
@@ -183,7 +183,7 @@ object BLAS extends Serializable {
             val sxIndices = sx.indices
             val sxValues = sx.data
             val dyValues = dy.data
-            val nnz = sxIndices.size
+            val nnz = sxIndices.length
 
             var i = 0
             var k = 0
@@ -215,9 +215,9 @@ object BLAS extends Serializable {
   def scal(a: Double, x: Vector): Unit = {
     x match {
       case sx: SparseVector =>
-        f2jBLAS.dscal(sx.data.size, a, sx.data, 1)
+        f2jBLAS.dscal(sx.data.length, a, sx.data, 1)
       case dx: DenseVector =>
-        f2jBLAS.dscal(dx.data.size, a, dx.data, 1)
+        f2jBLAS.dscal(dx.data.length, a, dx.data, 1)
       case _ =>
         throw new IllegalArgumentException(s"scal doesn't support vector type 
${x.getClass}.")
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala
index 74d4d8f..5058ec3 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala
@@ -18,7 +18,7 @@
 
 package org.apache.flink.ml.math
 
-import breeze.linalg.{ Matrix => BreezeMatrix, DenseMatrix => 
BreezeDenseMatrix,
+import breeze.linalg.{Matrix => BreezeMatrix, DenseMatrix => BreezeDenseMatrix,
 CSCMatrix => BreezeCSCMatrix, DenseVector => BreezeDenseVector, SparseVector 
=> BreezeSparseVector,
 Vector => BreezeVector}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala
index 0bb24f3..dac8fb2 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala
@@ -22,7 +22,7 @@ import breeze.linalg.{Vector => BreezeVector}
 
 /** Type class which allows the conversion from Breeze vectors to Flink vectors
   *
-  * @tparam T Resulting type of the conversion
+  * @tparam T Resulting type of the conversion, subtype of [[Vector]]
   */
 trait BreezeVectorConverter[T <: Vector] extends Serializable {
   /** Converts a Breeze vector into a Flink vector of type T

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseMatrix.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseMatrix.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseMatrix.scala
index 4ae565e..80d1dc3 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseMatrix.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseMatrix.scala
@@ -26,12 +26,8 @@ package org.apache.flink.ml.math
  * @param numCols Number of columns
  * @param data Array of matrix elements in column major order
  */
-case class DenseMatrix(
-    val numRows: Int,
-    val numCols: Int,
-    val data: Array[Double])
-  extends Matrix
-  with Serializable{
+case class DenseMatrix(numRows: Int, numCols: Int, data: Array[Double])
+  extends Matrix with Serializable {
 
   import DenseMatrix._
 
@@ -55,20 +51,18 @@ case class DenseMatrix(
     val result = StringBuilder.newBuilder
     result.append(s"DenseMatrix($numRows, $numCols)\n")
 
-    val linewidth = LINE_WIDTH
-
     val columnsFieldWidths = for(row <- 0 until math.min(numRows, MAX_ROWS)) 
yield {
       var column = 0
       var maxFieldWidth = 0
 
-      while(column * maxFieldWidth < linewidth && column < numCols) {
+      while(column * maxFieldWidth < LINE_WIDTH && column < numCols) {
         val fieldWidth = printEntry(row, column).length + 2
 
         if(fieldWidth > maxFieldWidth) {
           maxFieldWidth = fieldWidth
         }
 
-        if(column * maxFieldWidth < linewidth) {
+        if(column * maxFieldWidth < LINE_WIDTH) {
           column += 1
         }
       }
@@ -128,6 +122,10 @@ case class DenseMatrix(
     data(index) = value
   }
 
+  /** Converts the DenseMatrix to a SparseMatrix
+    *
+    * @return SparseMatrix build from all the non-null values
+    */
   def toSparseMatrix: SparseMatrix = {
     val entries = for(row <- 0 until numRows; col <- 0 until numCols) yield {
       (row, col, apply(row, col))
@@ -138,9 +136,9 @@ case class DenseMatrix(
 
   /** Calculates the linear index of the respective matrix entry
     *
-    * @param row
-    * @param col
-    * @return
+    * @param row row index
+    * @param col column index
+    * @return the index of the value according to the row and index
     */
   private def locate(row: Int, col: Int): Int = {
     require(0 <= row && row < numRows && 0 <= col && col < numCols,
@@ -151,9 +149,9 @@ case class DenseMatrix(
 
   /** Converts the entry at (row, col) to string
     *
-    * @param row
-    * @param col
-    * @return
+    * @param row row index
+    * @param col column index
+    * @return Takes the value according to the row and index and convert it to 
string
     */
   private def printEntry(row: Int, col: Int): String = {
     val index = locate(row, col)

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
index 5e70741..c0c37bb 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
@@ -26,14 +26,11 @@ import breeze.linalg.{SparseVector => BreezeSparseVector, 
DenseVector => BreezeD
  *
  * @param data Array of doubles to store the vector elements
  */
-case class DenseVector(
-    data: Array[Double])
-  extends Vector
-  with Serializable {
+case class DenseVector(data: Array[Double]) extends Vector with Serializable {
 
   /**
    * Number of elements in a vector
-   * @return
+   * @return the number of the elements in the vector
    */
   override def size: Int = {
     data.length
@@ -136,10 +133,16 @@ case class DenseVector(
 
   /** Magnitude of a vector
     *
-    * @return
+    * @return The length of the vector
     */
-  override def magnitude: Double = math.sqrt(data.map(x => x * x).sum)
+  override def magnitude: Double = {
+    math.sqrt(data.map(x => x * x).sum)
+  }
 
+  /** Convert to a [[SparseVector]]
+    *
+    * @return Creates a SparseVector from the DenseVector
+    */
   def toSparseVector: SparseVector = {
     val nonZero = (0 until size).zip(data).filter(_._2 != 0)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Matrix.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Matrix.scala 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Matrix.scala
index ba6a781..ea2b4cd 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Matrix.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Matrix.scala
@@ -25,13 +25,13 @@ trait Matrix {
 
   /** Number of rows
     *
-    * @return
+    * @return number of rows in the matrix
     */
   def numRows: Int
 
   /** Number of columns
     *
-    * @return
+    * @return number of columns in the matrix
     */
   def numCols: Int
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseMatrix.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseMatrix.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseMatrix.scala
index fe58ddb..d7dd9dc 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseMatrix.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseMatrix.scala
@@ -47,7 +47,6 @@ class SparseMatrix(
     * @return matrix entry at (row, col)
     */
   override def apply(row: Int, col: Int): Double = {
-
     val index = locate(row, col)
 
     if(index < 0){
@@ -152,10 +151,11 @@ object SparseMatrix{
 
   /** Constructs a sparse matrix from a coordinate list (COO) representation 
where each entry
     * is stored as a tuple of (rowIndex, columnIndex, value).
-    * @param numRows
-    * @param numCols
-    * @param entries
-    * @return
+    *
+    * @param numRows Number of rows
+    * @param numCols Number of columns
+    * @param entries Data entries in the matrix
+    * @return Newly constructed sparse matrix
     */
   def fromCOO(numRows: Int, numCols: Int, entries: (Int, Int, Double)*): 
SparseMatrix = {
     fromCOO(numRows, numCols, entries)
@@ -164,10 +164,10 @@ object SparseMatrix{
   /** Constructs a sparse matrix from a coordinate list (COO) representation 
where each entry
     * is stored as a tuple of (rowIndex, columnIndex, value).
     *
-    * @param numRows
-    * @param numCols
-    * @param entries
-    * @return
+    * @param numRows Number of rows
+    * @param numCols Number of columns
+    * @param entries Data entries in the matrix
+    * @return Newly constructed sparse matrix
     */
   def fromCOO(numRows: Int, numCols: Int, entries: Iterable[(Int, Int, 
Double)]): SparseMatrix = {
     val entryArray = entries.toArray
@@ -256,10 +256,10 @@ object SparseMatrix{
     * cannot infer that the tuple has to be of type (Int, Int, Double) because 
of the overloading
     * with the Iterable type.
     *
-    * @param numRows
-    * @param numCols
-    * @param entry
-    * @return
+    * @param numRows Number of rows
+    * @param numCols Number of columns
+    * @param entry Data entries in the matrix
+    * @return Newly constructed sparse matrix
     */
   def fromCOO(numRows: Int, numCols: Int, entry: (Int, Int, Int)): 
SparseMatrix = {
     fromCOO(numRows, numCols, (entry._1, entry._2, entry._3.toDouble))

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
index fec018f..29c16cc 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
@@ -25,12 +25,8 @@ import scala.util.Sorting
 /** Sparse vector implementation storing the data in two arrays. One index 
contains the sorted
   * indices of the non-zero vector entries and the other the corresponding 
vector entries
   */
-case class SparseVector(
-    size: Int,
-    indices: Array[Int],
-    data: Array[Double])
-  extends Vector
-  with Serializable {
+case class SparseVector(size: Int, indices: Array[Int], data: Array[Double])
+  extends Vector with Serializable {
 
   /** Updates the element at the given index with the provided value
     *
@@ -41,8 +37,8 @@ case class SparseVector(
     val resolvedIndex = locate(index)
 
     if (resolvedIndex < 0) {
-      throw new IllegalArgumentException("Cannot update zero value of sparse 
vector at index " +
-        index)
+      throw new IllegalArgumentException("Cannot update zero value of sparse 
vector at " +
+        s"index $index")
     } else {
       data(resolvedIndex) = value
     }
@@ -50,7 +46,7 @@ case class SparseVector(
 
   /** Copies the vector instance
     *
-    * @return Copy of the vector instance
+    * @return Copy of the [[SparseVector]] instance
     */
   override def copy: SparseVector = {
     new SparseVector(size, indices.clone, data.clone)
@@ -86,13 +82,11 @@ case class SparseVector(
     }
   }
 
-  /** Returns the outer product (a.k.a. Kronecker product) of `this`
-    * with `other`. The result is given in 
[[org.apache.flink.ml.math.SparseMatrix]]
-    * representation.
+  /** Returns the outer product (a.k.a. Kronecker product) of `this` with 
`other`. The result is
+    * given in [[SparseMatrix]] representation.
     *
-    * @param other a Vector
-    * @return the [[org.apache.flink.ml.math.SparseMatrix]] which equals the 
outer product of `this`
-    *         with `other.`
+    * @param other a [[Vector]]
+    * @return the [[SparseMatrix]] which equals the outer product of `this` 
with `other.`
     */
   override def outer(other: Vector): SparseMatrix = {
     val numRows = size
@@ -121,7 +115,7 @@ case class SparseVector(
 
   /** Magnitude of a vector
     *
-    * @return
+    * @return The length of the vector
     */
   override def magnitude: Double = math.sqrt(data.map(x => x * x).sum)
 
@@ -140,6 +134,10 @@ case class SparseVector(
     }
   }
 
+  /** Converts the [[SparseVector]] to a [[DenseVector]]
+    *
+    * @return The DenseVector out of the SparseVector
+    */
   def toDenseVector: DenseVector = {
     val denseVector = DenseVector.zeros(size)
 
@@ -182,9 +180,9 @@ object SparseVector {
   /** Constructs a sparse vector from a coordinate list (COO) representation 
where each entry
     * is stored as a tuple of (index, value).
     *
-    * @param size
-    * @param entries
-    * @return
+    * @param size The number of elements in the vector
+    * @param entries The values in the vector
+    * @return a new [[SparseVector]]
     */
   def fromCOO(size: Int, entries: (Int, Double)*): SparseVector = {
     fromCOO(size, entries)
@@ -193,9 +191,9 @@ object SparseVector {
   /** Constructs a sparse vector from a coordinate list (COO) representation 
where each entry
     * is stored as a tuple of (index, value).
     *
-    * @param size
-    * @param entries
-    * @return
+    * @param size The number of elements in the vector
+    * @param entries An iterator supplying the values in the vector
+    * @return a new [[SparseVector]]
     */
   def fromCOO(size: Int, entries: Iterable[(Int, Double)]): SparseVector = {
     val entryArray = entries.toArray
@@ -255,9 +253,9 @@ object SparseVector {
     * type inference mechanism cannot infer that the second tuple value has to 
be of type Double
     * if only a single tuple is provided.
     *
-    * @param size
-    * @param entry
-    * @return
+    * @param size The number of elements in the vector
+    * @param entry The value in the vector
+    * @return a new [[SparseVector]]
     */
   def fromCOO(size: Int, entry: (Int, Int)): SparseVector = {
     fromCOO(size, (entry._1, entry._2.toDouble))

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
index e52328d..0c911d0 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
@@ -27,21 +27,21 @@ trait Vector extends Serializable {
 
   /** Number of elements in a vector
     *
-    * @return
+    * @return The number of elements of the vector
     */
   def size: Int
 
   /** Element wise access function
     *
-    * * @param index index of the accessed element
-    * @return element with index
+    * @param index index of the accessed element
+    * @return value of the associated with the index
     */
   def apply(index: Int): Double
 
   /** Updates the element at the given index with the provided value
     *
-    * @param index
-    * @param value
+    * @param index The index of the element to be updated
+    * @param value The new value
     */
   def update(index: Int, value: Double): Unit
 
@@ -60,7 +60,6 @@ trait Vector extends Serializable {
 
   /** Returns the outer product of the recipient and the argument
     *
-    *
     * @param other a Vector
     * @return a matrix
     */
@@ -68,7 +67,7 @@ trait Vector extends Serializable {
 
   /** Magnitude of a vector
     *
-    * @return
+    * @return The length of the vector
     */
   def magnitude: Double
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
index 9092e5c..5dca01a 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/distributed/DistributedRowMatrix.scala
@@ -19,9 +19,9 @@
 package org.apache.flink.ml.math.distributed
 
 import org.apache.flink.api.scala._
+import org.apache.flink.ml.math._
 import org.apache.flink.ml.math.Breeze._
 import org.apache.flink.ml.math.distributed.DistributedMatrix._
-import org.apache.flink.ml.math._
 
 /** Represents distributed row-major matrix.
   *
@@ -37,6 +37,8 @@ class DistributedRowMatrix(
 
   /** Collects the data in the form of a sequence of coordinates associated 
with their values.
     * This operation immediately triggers program execution.
+    *
+    * @return Returns the matrix in the sparse coordinate format
     */
   def toCOO: Seq[(MatrixRowIndex, MatrixColIndex, Double)] = {
     val localRows = data.collect()
@@ -49,6 +51,8 @@ class DistributedRowMatrix(
 
   /** Collects the data in the form of a SparseMatrix. This operation 
immediately triggers program
     * execution.
+    *
+    * @return Returns the matrix as a local [[SparseMatrix]]
     */
   def toLocalSparseMatrix: SparseMatrix = {
     val localMatrix = SparseMatrix.fromCOO(this.numRows, this.numCols, 
this.toCOO)
@@ -61,6 +65,8 @@ class DistributedRowMatrix(
   // TODO: convert to dense representation on the distributed matrix and 
collect it afterward
   /** Collects the data in the form of a DenseMatrix. This operation 
immediately triggers program
     * execution.
+    *
+    * @return Returns the matrix as a [[DenseMatrix]]
     */
   def toLocalDenseMatrix: DenseMatrix = this.toLocalSparseMatrix.toDenseMatrix
 
@@ -68,6 +74,7 @@ class DistributedRowMatrix(
     *
     * @param func  a function to be applied
     * @param other a [[DistributedRowMatrix]] to apply the function together
+    * @return Applies the function and returns a new [[DistributedRowMatrix]]
     */
   def byRowOperation(
     func: (Vector, Vector) => Vector,
@@ -101,6 +108,7 @@ class DistributedRowMatrix(
   /** Adds this matrix to another matrix.
     *
     * @param other a [[DistributedRowMatrix]] to be added
+    * @return [[DistributedRowMatrix]] representing the two matrices added.
     */
   def add(other: DistributedRowMatrix): DistributedRowMatrix = {
     val addFunction = (x: Vector, y: Vector) => (x.asBreeze + 
y.asBreeze).fromBreeze
@@ -110,6 +118,8 @@ class DistributedRowMatrix(
   /** Subtracts another matrix from this matrix.
     *
     * @param other a [[DistributedRowMatrix]] to be subtracted from this matrix
+    * @return [[DistributedRowMatrix]] representing the original matrix 
subtracted by the supplied
+    *        matrix.
     */
   def subtract(other: DistributedRowMatrix): DistributedRowMatrix = {
     val subFunction = (x: Vector, y: Vector) => (x.asBreeze - 
y.asBreeze).fromBreeze
@@ -127,6 +137,7 @@ object DistributedRowMatrix {
     * @param numCols  Number of columns
     * @param isSorted If false, sorts the row to properly build the matrix 
representation.
     *                 If already sorted, set this parameter to true to skip 
sorting.
+    * @return the [[DistributedRowMatrix]] build from the original coordinate 
matrix
     */
   def fromCOO(data: DataSet[(MatrixRowIndex, MatrixColIndex, Double)],
     numRows: Int,

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
index 4c7f254..7493602 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
@@ -89,22 +89,22 @@ package object math {
 
   /** Stores the vector values in a dense array
     *
-    * @param vector
+    * @param vector Subtype of [[Vector]]
     * @return Array containing the vector values
     */
   def vector2Array(vector: Vector): Array[Double] = {
     vector match {
       case dense: DenseVector => dense.data.clone
 
-      case sparse: SparseVector =>
+      case sparse: SparseVector => {
         val result = new Array[Double](sparse.size)
 
-        for((index, value) <- sparse) {
+        for ((index, value) <- sparse) {
           result(index) = value
         }
 
         result
-
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala
index d15fdaf..527e636 100644
--- a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala
+++ b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/nn/KNN.scala
@@ -131,7 +131,7 @@ class KNN extends Predictor[KNN] {
 
   /** Parameter a user can specify if one of the training or test sets are 
small
     *
-    * @param sizeHint
+    * @param sizeHint cross hint tells the system which sizes to expect from 
the data sets
     */
   def setSizeHint(sizeHint: CrossHint): KNN = {
     parameters.add(SizeHint, sizeHint)

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala
index 5cf69b6..ac0053e 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala
@@ -62,6 +62,6 @@ object SquaredLoss extends PartialLossFunction {
     * @return
     */
   override def derivative(prediction: Double, label: Double): Double = {
-    (prediction - label)
+    prediction - label
   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
index 3bad038..ee91bd1 100644
--- 
a/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
+++ 
b/flink-libraries/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
@@ -80,7 +80,7 @@ abstract class Solver extends Serializable with 
WithParameters {
     dimensionDS.map {
       dimension =>
         val values = Array.fill(dimension)(0.0)
-        new WeightVector(DenseVector(values), 0.0)
+        WeightVector(DenseVector(values), 0.0)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala
index aa1c4f9..bb5fd45 100644
--- 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/math/DenseVectorSuite.scala
@@ -110,34 +110,28 @@ class DenseVectorSuite extends FlatSpec with Matchers {
     vec1.outer(vec2) should be(SparseMatrix.fromCOO(5, 5, entries))
   }
 
-
-
-  it should s"""calculate right outer product with DenseVector
-               |with one-dimensional unit DenseVector as 
identity""".stripMargin in {
+  it should "DenseVector right outer product with one-dimensional DenseVector 
as identity" in {
     val vec = DenseVector(Array(1, 0, 1, 0, 0))
     val unit = DenseVector(1)
 
     vec.outer(unit) should equal(DenseMatrix(vec.size, 1, vec.data))
   }
 
-  it should s"""calculate right outer product with DenseVector
-               |with one-dimensional unit SparseVector as 
identity""".stripMargin in {
+  it should "DenseVector right outer product with one-dimensional SparseVector 
as identity" in {
     val vec = DenseVector(Array(1, 0, 1, 0, 0))
     val unit = SparseVector(1, Array(0), Array(1))
 
     vec.outer(unit) should equal(SparseMatrix.fromCOO(vec.size, 1, (0, 0, 1), 
(2, 0, 1)))
   }
 
-  it should s"""calculate left outer product for DenseVector
-               |with one-dimensional unit DenseVector as 
identity""".stripMargin in {
+  it should "DenseVector left outer product with one-dimensional unit 
DenseVector as identity" in {
     val vec = DenseVector(Array(1, 2, 3, 4, 5))
     val unit = DenseVector(1)
 
     unit.outer(vec) should equal(DenseMatrix(1, vec.size, vec.data))
   }
 
-  it should s"""calculate left outer product for SparseVector
-               |with one-dimensional unit DenseVector as 
identity""".stripMargin in {
+  it should "SparseVector left outer product with one-dimensional unit 
DenseVector as identity" in {
     val vec = SparseVector(5, Array(0, 1, 2, 3, 4), Array(1, 2, 3, 4, 5))
     val unit = DenseVector(1)
 
@@ -145,32 +139,14 @@ class DenseVectorSuite extends FlatSpec with Matchers {
     unit.outer(vec) should equal(SparseMatrix.fromCOO(1, vec.size, entries))
   }
 
-  it should s"""calculate outer product with DenseVector
-               |via multiplication if both vectors are 
one-dimensional""".stripMargin in {
-    val vec1 = DenseVector(Array(2))
-    val vec2 = DenseVector(Array(3))
-
-    vec1.outer(vec2) should be(DenseMatrix(1, 1, 2 * 3))
-  }
-
-  it should s"""calculate outer product with SparseVector
-               |via multiplication if both vectors are 
one-dimensional""".stripMargin in {
-    val vec1 = DenseVector(Array(2))
-    val vec2 = SparseVector(1, Array(0), Array(3))
-
-    vec1.outer(vec2) should be(SparseMatrix.fromCOO(1, 1, (0, 0, 2 * 3)))
-  }
-
-  it should "calculate outer product with DenseVector via multiplication if 
both vectors " +
-    "are one-dimensional" in {
+  it should "DenseVector outer product via multiplication if both vectors are 
one-dimensional" in {
     val vec1 = DenseVector(Array(2))
     val vec2 = DenseVector(Array(3))
 
     vec1.outer(vec2) should be(DenseMatrix(1, 1, 2 * 3))
   }
 
-  it should "calculate outer product with SparseVector via multiplication if 
both vectors are " +
-    "one-dimensioan" in {
+  it should "SparseVector outer product via multiplication if both vectors are 
one-dimensional" in {
     val vec1 = DenseVector(Array(2))
     val vec2 = SparseVector(1, Array(0), Array(3))
 
@@ -180,7 +156,7 @@ class DenseVectorSuite extends FlatSpec with Matchers {
   it should "calculate magnitude of vector" in {
     val vec = DenseVector(Array(1, 4, 8))
 
-    vec.magnitude should be(9)
+    vec.magnitude should be(Math.sqrt((1 * 1) + (4 * 4) + (8 * 8)))
   }
 
   it should "convert from and to Breeze vector" in {

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala
index 4dc7cf3..1b272e8 100644
--- 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/PolynomialFeaturesITSuite.scala
@@ -26,38 +26,38 @@ import org.scalatest.{FlatSpec, Matchers}
 
 class PolynomialFeaturesITSuite
   extends FlatSpec
-  with Matchers
-  with FlinkTestBase {
+    with Matchers
+    with FlinkTestBase {
 
   behavior of "The polynomial base implementation"
 
   it should "map single element vectors to the polynomial vector space" in {
     val env = ExecutionEnvironment.getExecutionEnvironment
 
-    env.setParallelism (2)
+    env.setParallelism(2)
 
-    val input = Seq (
-    LabeledVector (1.0, DenseVector (1)),
-    LabeledVector (2.0, DenseVector (2))
+    val input = Seq(
+      LabeledVector(1.0, DenseVector(1)),
+      LabeledVector(2.0, DenseVector(2))
     )
 
-    val inputDS = env.fromCollection (input)
+    val inputDS = env.fromCollection(input)
 
     val transformer = PolynomialFeatures()
-    .setDegree (3)
+      .setDegree(3)
 
     val transformedDS = transformer.transform(inputDS)
 
-    val expectedMap = List (
-    (1.0 -> DenseVector (1.0, 1.0, 1.0) ),
-    (2.0 -> DenseVector (8.0, 4.0, 2.0) )
-    ) toMap
+    val expectedMap = Map(
+      1.0 -> DenseVector(1.0, 1.0, 1.0),
+      2.0 -> DenseVector(8.0, 4.0, 2.0)
+    )
 
     val result = transformedDS.collect()
 
     for (entry <- result) {
-    expectedMap.contains (entry.label) should be (true)
-    entry.vector should equal (expectedMap (entry.label) )
+      expectedMap.contains(entry.label) should be(true)
+      entry.vector should equal(expectedMap(entry.label))
     }
   }
 
@@ -86,7 +86,7 @@ class PolynomialFeaturesITSuite
 
     val result = transformedDS.collect()
 
-    for(entry <- result) {
+    for (entry <- result) {
       expectedMap.contains(entry.label) should be(true)
       entry.vector should equal(expectedMap(entry.label))
     }
@@ -111,12 +111,12 @@ class PolynomialFeaturesITSuite
 
     val result = transformedDS.collect()
 
-    val expectedMap = List(
-      (1.0 -> DenseVector()),
-      (2.0 -> DenseVector())
-    ) toMap
+    val expectedMap = Map(
+      1.0 -> DenseVector(),
+      2.0 -> DenseVector()
+    )
 
-    for(entry <- result) {
+    for (entry <- result) {
       expectedMap.contains(entry.label) should be(true)
       entry.vector should equal(expectedMap(entry.label))
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
index 767de1d..19aef68 100644
--- 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
@@ -43,7 +43,7 @@ class StandardScalerITSuite
       expectedStd: Double): Unit = {
     scaledVectors.length should equal(data.length)
 
-    val numberOfFeatures = scaledVectors(0).size
+    val numberOfFeatures = scaledVectors.head.size
     var scaledMean: linalg.Vector[Double] = 
linalg.DenseVector.zeros(numberOfFeatures)
     var scaledStd: linalg.Vector[Double] = 
linalg.DenseVector.zeros(numberOfFeatures)
 
@@ -95,7 +95,7 @@ class StandardScalerITSuite
     val dataSet = env.fromCollection(data).map(v => LabeledVector(1.0, v))
     val scaler = StandardScaler()
     scaler.fit(dataSet)
-    val scaledVectors = scaler.transform(dataSet).map(lv => 
lv.vector).collect()
+    val scaledVectors = scaler.transform(dataSet).map(_.vector).collect()
 
     checkVectors(scaledVectors, 0.0, 1.0)
   }
@@ -106,7 +106,7 @@ class StandardScalerITSuite
     val dataSet = env.fromCollection(data).map(v => (v, 1.0))
     val scaler = StandardScaler()
     scaler.fit(dataSet)
-    val scaledVectors = scaler.transform(dataSet).map(vl => vl._1).collect()
+    val scaledVectors = scaler.transform(dataSet).map(_._1).collect()
 
     checkVectors(scaledVectors, 0.0, 1.0)
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
index 8dca0c3..043d8cb 100644
--- 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/recommendation/ALSITSuite.scala
@@ -25,10 +25,7 @@ import scala.language.postfixOps
 
 import org.apache.flink.api.scala._
 
-class ALSITSuite
-  extends FlatSpec
-  with Matchers
-  with FlinkTestBase {
+class ALSITSuite extends FlatSpec with Matchers with FlinkTestBase {
 
   override val parallelism = 2
 
@@ -49,7 +46,7 @@ class ALSITSuite
 
     als.fit(inputDS)
 
-    val testData = env.fromCollection(expectedResult.map{
+    val testData = env.fromCollection(expectedResult.map {
       case (userID, itemID, rating) => (userID, itemID)
     })
 
@@ -57,9 +54,9 @@ class ALSITSuite
 
     predictions.length should equal(expectedResult.length)
 
-    val resultMap = expectedResult map {
+    val resultMap = expectedResult.map {
       case (uID, iID, value) => (uID, iID) -> value
-    } toMap
+    }.toMap
 
     predictions foreach {
       case (uID, iID, value) => {

http://git-wip-us.apache.org/repos/asf/flink/blob/1753b1d2/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
index fc3df3a..72e84f2 100644
--- 
a/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
+++ 
b/flink-libraries/flink-ml/src/test/scala/org/apache/flink/ml/regression/MultipleLinearRegressionITSuite.scala
@@ -29,7 +29,7 @@ class MultipleLinearRegressionITSuite
   with Matchers
   with FlinkTestBase {
 
-  behavior of "The multipe linear regression implementation"
+  behavior of "The multiple linear regression implementation"
 
   it should "estimate a linear function" in {
     val env = ExecutionEnvironment.getExecutionEnvironment

Reply via email to