http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala
deleted file mode 100644
index 74d4d8f..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Breeze.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.math
-
-import breeze.linalg.{ Matrix => BreezeMatrix, DenseMatrix => 
BreezeDenseMatrix,
-CSCMatrix => BreezeCSCMatrix, DenseVector => BreezeDenseVector, SparseVector 
=> BreezeSparseVector,
-Vector => BreezeVector}
-
-/** This class contains convenience function to wrap a matrix/vector into a 
breeze matrix/vector
-  * and to unwrap it again.
-  *
-  */
-object Breeze {
-
-  implicit class Matrix2BreezeConverter(matrix: Matrix) {
-    def asBreeze: BreezeMatrix[Double] = {
-      matrix match {
-        case dense: DenseMatrix =>
-          new BreezeDenseMatrix[Double](
-            dense.numRows,
-            dense.numCols,
-            dense.data)
-
-        case sparse: SparseMatrix =>
-          new BreezeCSCMatrix[Double](
-            sparse.data,
-            sparse.numRows,
-            sparse.numCols,
-            sparse.colPtrs,
-            sparse.rowIndices
-          )
-      }
-    }
-  }
-
-  implicit class Breeze2MatrixConverter(matrix: BreezeMatrix[Double]) {
-    def fromBreeze: Matrix = {
-      matrix match {
-        case dense: BreezeDenseMatrix[Double] =>
-          new DenseMatrix(dense.rows, dense.cols, dense.data)
-
-        case sparse: BreezeCSCMatrix[Double] =>
-          new SparseMatrix(sparse.rows, sparse.cols, sparse.rowIndices, 
sparse.colPtrs, sparse.data)
-      }
-    }
-  }
-
-  implicit class BreezeArrayConverter[T](array: Array[T]) {
-    def asBreeze: BreezeDenseVector[T] = {
-      new BreezeDenseVector[T](array)
-    }
-  }
-
-  implicit class Breeze2VectorConverter(vector: BreezeVector[Double]) {
-    def fromBreeze[T <: Vector: BreezeVectorConverter]: T = {
-      val converter = implicitly[BreezeVectorConverter[T]]
-      converter.convert(vector)
-    }
-  }
-
-  implicit class Vector2BreezeConverter(vector: Vector) {
-    def asBreeze: BreezeVector[Double] = {
-      vector match {
-        case dense: DenseVector =>
-          new breeze.linalg.DenseVector(dense.data)
-
-        case sparse: SparseVector =>
-          new BreezeSparseVector(sparse.indices, sparse.data, sparse.size)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala
deleted file mode 100644
index 0bb24f3..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.math
-
-import breeze.linalg.{Vector => BreezeVector}
-
-/** Type class which allows the conversion from Breeze vectors to Flink vectors
-  *
-  * @tparam T Resulting type of the conversion
-  */
-trait BreezeVectorConverter[T <: Vector] extends Serializable {
-  /** Converts a Breeze vector into a Flink vector of type T
-    *
-    * @param vector Breeze vector
-    * @return Flink vector of type T
-    */
-  def convert(vector: BreezeVector[Double]): T
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseMatrix.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseMatrix.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseMatrix.scala
deleted file mode 100644
index 4ae565e..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseMatrix.scala
+++ /dev/null
@@ -1,193 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.math
-
-/**
- * Dense matrix implementation of [[Matrix]]. Stores data in column major 
order in a continuous
- * double array.
- *
- * @param numRows Number of rows
- * @param numCols Number of columns
- * @param data Array of matrix elements in column major order
- */
-case class DenseMatrix(
-    val numRows: Int,
-    val numCols: Int,
-    val data: Array[Double])
-  extends Matrix
-  with Serializable{
-
-  import DenseMatrix._
-
-  require(numRows * numCols == data.length, s"The number of values 
${data.length} does " +
-    s"not correspond to its dimensions ($numRows, $numCols).")
-
-  /**
-   * Element wise access function
-   *
-   * @param row row index
-   * @param col column index
-   * @return matrix entry at (row, col)
-   */
-  override def apply(row: Int, col: Int): Double = {
-    val index = locate(row, col)
-
-    data(index)
-  }
-
-  override def toString: String = {
-    val result = StringBuilder.newBuilder
-    result.append(s"DenseMatrix($numRows, $numCols)\n")
-
-    val linewidth = LINE_WIDTH
-
-    val columnsFieldWidths = for(row <- 0 until math.min(numRows, MAX_ROWS)) 
yield {
-      var column = 0
-      var maxFieldWidth = 0
-
-      while(column * maxFieldWidth < linewidth && column < numCols) {
-        val fieldWidth = printEntry(row, column).length + 2
-
-        if(fieldWidth > maxFieldWidth) {
-          maxFieldWidth = fieldWidth
-        }
-
-        if(column * maxFieldWidth < linewidth) {
-          column += 1
-        }
-      }
-
-      (column, maxFieldWidth)
-    }
-
-    val (columns, fieldWidths) = columnsFieldWidths.unzip
-
-    val maxColumns = columns.min
-    val fieldWidth = fieldWidths.max
-
-    for(row <- 0 until math.min(numRows, MAX_ROWS)) {
-      for(col <- 0 until maxColumns) {
-        val str = printEntry(row, col)
-
-        result.append(" " * (fieldWidth - str.length) + str)
-      }
-
-      if(maxColumns < numCols) {
-        result.append("...")
-      }
-
-      result.append("\n")
-    }
-
-    if(numRows > MAX_ROWS) {
-      result.append("...\n")
-    }
-
-    result.toString()
-  }
-
-  override def equals(obj: Any): Boolean = {
-    obj match {
-      case dense: DenseMatrix =>
-        numRows == dense.numRows && numCols == dense.numCols && 
data.sameElements(dense.data)
-      case _ => false
-    }
-  }
-
-  override def hashCode: Int = {
-    val hashCodes = List(numRows.hashCode(), numCols.hashCode(), 
java.util.Arrays.hashCode(data))
-
-    hashCodes.foldLeft(3){(left, right) => left * 41 + right}
-  }
-
-  /** Element wise update function
-    *
-    * @param row row index
-    * @param col column index
-    * @param value value to set at (row, col)
-    */
-  override def update(row: Int, col: Int, value: Double): Unit = {
-    val index = locate(row, col)
-
-    data(index) = value
-  }
-
-  def toSparseMatrix: SparseMatrix = {
-    val entries = for(row <- 0 until numRows; col <- 0 until numCols) yield {
-      (row, col, apply(row, col))
-    }
-
-    SparseMatrix.fromCOO(numRows, numCols, entries.filter(_._3 != 0))
-  }
-
-  /** Calculates the linear index of the respective matrix entry
-    *
-    * @param row
-    * @param col
-    * @return
-    */
-  private def locate(row: Int, col: Int): Int = {
-    require(0 <= row && row < numRows && 0 <= col && col < numCols,
-      (row, col) + " not in [0, " + numRows + ") x [0, " + numCols + ")")
-
-    row + col * numRows
-  }
-
-  /** Converts the entry at (row, col) to string
-    *
-    * @param row
-    * @param col
-    * @return
-    */
-  private def printEntry(row: Int, col: Int): String = {
-    val index = locate(row, col)
-
-    data(index).toString
-  }
-
-  /** Copies the matrix instance
-    *
-    * @return Copy of itself
-    */
-  override def copy: DenseMatrix = {
-    new DenseMatrix(numRows, numCols, data.clone)
-  }
-}
-
-object DenseMatrix {
-
-  val LINE_WIDTH = 100
-  val MAX_ROWS = 50
-
-  def apply(numRows: Int, numCols: Int, values: Array[Int]): DenseMatrix = {
-    new DenseMatrix(numRows, numCols, values.map(_.toDouble))
-  }
-
-  def apply(numRows: Int, numCols: Int, values: Double*): DenseMatrix = {
-    new DenseMatrix(numRows, numCols, values.toArray)
-  }
-
-  def zeros(numRows: Int, numCols: Int): DenseMatrix = {
-    new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(0.0))
-  }
-
-  def eye(numRows: Int, numCols: Int): DenseMatrix = {
-    new DenseMatrix(numRows, numCols, Array.fill(numRows * numCols)(1.0))
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
deleted file mode 100644
index 5e70741..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/DenseVector.scala
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.math
-
-import breeze.linalg.{SparseVector => BreezeSparseVector, DenseVector => 
BreezeDenseVector, Vector => BreezeVector}
-
-/**
- * Dense vector implementation of [[Vector]]. The data is represented in a 
continuous array of
- * doubles.
- *
- * @param data Array of doubles to store the vector elements
- */
-case class DenseVector(
-    data: Array[Double])
-  extends Vector
-  with Serializable {
-
-  /**
-   * Number of elements in a vector
-   * @return
-   */
-  override def size: Int = {
-    data.length
-  }
-
-  /**
-   * Element wise access function
-   *
-   * @param index index of the accessed element
-   * @return element at the given index
-   */
-  override def apply(index: Int): Double = {
-    require(0 <= index && index < data.length, index + " not in [0, " + 
data.length + ")")
-    data(index)
-  }
-
-  override def toString: String = {
-    s"DenseVector(${data.mkString(", ")})"
-  }
-
-  override def equals(obj: Any): Boolean = {
-    obj match {
-      case dense: DenseVector => data.length == dense.data.length && 
data.sameElements(dense.data)
-      case _ => false
-    }
-  }
-
-  override def hashCode: Int = {
-    java.util.Arrays.hashCode(data)
-  }
-
-  /**
-   * Copies the vector instance
-   *
-   * @return Copy of the vector instance
-   */
-  override def copy: DenseVector = {
-    DenseVector(data.clone())
-  }
-
-  /** Updates the element at the given index with the provided value
-    *
-    * @param index Index whose value is updated.
-    * @param value The value used to update the index.
-    */
-  override def update(index: Int, value: Double): Unit = {
-    require(0 <= index && index < data.length, index + " not in [0, " + 
data.length + ")")
-
-    data(index) = value
-  }
-
-  /** Returns the dot product of the recipient and the argument
-    *
-    * @param other a Vector
-    * @return a scalar double of dot product
-    */
-  override def dot(other: Vector): Double = {
-    require(size == other.size, "The size of vector must be equal.")
-
-    other match {
-      case SparseVector(_, otherIndices, otherData) =>
-        otherIndices.zipWithIndex.map {
-          case (idx, sparseIdx) => data(idx) * otherData(sparseIdx)
-        }.sum
-      case _ => (0 until size).map(i => data(i) * other(i)).sum
-    }
-  }
-
-  /** Returns the outer product (a.k.a. Kronecker product) of `this`
-    * with `other`. The result will given in 
[[org.apache.flink.ml.math.SparseMatrix]]
-    * representation if `other` is sparse and as 
[[org.apache.flink.ml.math.DenseMatrix]] otherwise.
-    *
-    * @param other a Vector
-    * @return the [[org.apache.flink.ml.math.Matrix]] which equals the outer 
product of `this`
-    *         with `other.`
-    */
-  override def outer(other: Vector): Matrix = {
-    val numRows = size
-    val numCols = other.size
-
-    other match {
-      case sv: SparseVector =>
-        val entries = for {
-          i <- 0 until numRows
-          (j, k) <- sv.indices.zipWithIndex
-          value = this(i) * sv.data(k)
-          if value != 0
-        } yield (i, j, value)
-
-        SparseMatrix.fromCOO(numRows, numCols, entries)
-      case _ =>
-        val values = for {
-          i <- 0 until numRows
-          j <- 0 until numCols
-        } yield this(i) * other(j)
-
-        DenseMatrix(numRows, numCols, values.toArray)
-    }
-  }
-
-  /** Magnitude of a vector
-    *
-    * @return
-    */
-  override def magnitude: Double = math.sqrt(data.map(x => x * x).sum)
-
-  def toSparseVector: SparseVector = {
-    val nonZero = (0 until size).zip(data).filter(_._2 != 0)
-
-    SparseVector.fromCOO(size, nonZero)
-  }
-}
-
-object DenseVector {
-
-  def apply(values: Double*): DenseVector = {
-    new DenseVector(values.toArray)
-  }
-
-  def apply(values: Array[Int]): DenseVector = {
-    new DenseVector(values.map(_.toDouble))
-  }
-
-  def zeros(size: Int): DenseVector = {
-    init(size, 0.0)
-  }
-
-  def eye(size: Int): DenseVector = {
-    init(size, 1.0)
-  }
-
-  def init(size: Int, value: Double): DenseVector = {
-    new DenseVector(Array.fill(size)(value))
-  }
-
-  /** BreezeVectorConverter implementation for 
[[org.apache.flink.ml.math.DenseVector]]
-    *
-    * This allows to convert Breeze vectors into [[DenseVector]].
-    */
-  implicit val denseVectorConverter = new BreezeVectorConverter[DenseVector] {
-    override def convert(vector: BreezeVector[Double]): DenseVector = {
-      vector match {
-        case dense: BreezeDenseVector[Double] => new DenseVector(dense.data)
-        case sparse: BreezeSparseVector[Double] => new 
DenseVector(sparse.toDenseVector.data)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Matrix.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Matrix.scala 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Matrix.scala
deleted file mode 100644
index ba6a781..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Matrix.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.math
-
-/** Base trait for a matrix representation
-  *
-  */
-trait Matrix {
-
-  /** Number of rows
-    *
-    * @return
-    */
-  def numRows: Int
-
-  /** Number of columns
-    *
-    * @return
-    */
-  def numCols: Int
-
-  /** Element wise access function
-    *
-    * @param row row index
-    * @param col column index
-    * @return matrix entry at (row, col)
-    */
-  def apply(row: Int, col: Int): Double
-
-  /** Element wise update function
-    *
-    * @param row row index
-    * @param col column index
-    * @param value value to set at (row, col)
-    */
-  def update(row: Int, col: Int, value: Double): Unit
-
-  /** Copies the matrix instance
-    *
-    * @return Copy of itself
-    */
-  def copy: Matrix
-
-  def equalsMatrix(matrix: Matrix): Boolean = {
-    if(numRows == matrix.numRows && numCols == matrix.numCols) {
-      val coordinates = for(row <- 0 until numRows; col <- 0 until numCols) 
yield (row, col)
-      coordinates forall { case(row, col) => this.apply(row, col) == 
matrix(row, col)}
-    } else {
-      false
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseMatrix.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseMatrix.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseMatrix.scala
deleted file mode 100644
index fe58ddb..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseMatrix.scala
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.math
-
-import scala.util.Sorting
-
-/** Sparse matrix using the compressed sparse column (CSC) representation.
-  *
-  * More details concerning the compressed sparse column (CSC) representation 
can be found
-  * 
[http://en.wikipedia.org/wiki/Sparse_matrix#Compressed_sparse_column_.28CSC_or_CCS.29].
-  *
-  * @param numRows Number of rows
-  * @param numCols Number of columns
-  * @param rowIndices Array containing the row indices of non-zero entries
-  * @param colPtrs Array containing the starting offsets in data for each 
column
-  * @param data Array containing the non-zero entries in column-major order
-  */
-class SparseMatrix(
-    val numRows: Int,
-    val numCols: Int,
-    val rowIndices: Array[Int],
-    val colPtrs: Array[Int],
-    val data: Array[Double])
-  extends Matrix
-  with Serializable {
-
-  /** Element wise access function
-    *
-    * @param row row index
-    * @param col column index
-    * @return matrix entry at (row, col)
-    */
-  override def apply(row: Int, col: Int): Double = {
-
-    val index = locate(row, col)
-
-    if(index < 0){
-      0
-    } else {
-     data(index)
-    }
-  }
-
-  def toDenseMatrix: DenseMatrix = {
-    val result = DenseMatrix.zeros(numRows, numCols)
-
-    for(row <- 0 until numRows; col <- 0 until numCols) {
-      result(row, col) = apply(row, col)
-    }
-
-    result
-  }
-
-  /** Element wise update function
-    *
-    * @param row row index
-    * @param col column index
-    * @param value value to set at (row, col)
-    */
-  override def update(row: Int, col: Int, value: Double): Unit = {
-    val index = locate(row, col)
-
-    if(index < 0) {
-      throw new IllegalArgumentException("Cannot update zero value of sparse 
matrix at index " +
-      s"($row, $col)")
-    } else {
-      data(index) = value
-    }
-  }
-
-  override def toString: String = {
-    val result = StringBuilder.newBuilder
-
-    result.append(s"SparseMatrix($numRows, $numCols)\n")
-
-    var columnIndex = 0
-
-    val fieldWidth = math.max(numRows, numCols).toString.length
-    val valueFieldWidth = data.map(_.toString.length).max + 2
-
-    for(index <- 0 until colPtrs.last) {
-      while(colPtrs(columnIndex + 1) <= index){
-        columnIndex += 1
-      }
-
-      val rowStr = rowIndices(index).toString
-      val columnStr = columnIndex.toString
-      val valueStr = data(index).toString
-
-      result.append("(" + " " * (fieldWidth - rowStr.length) + rowStr + "," +
-        " " * (fieldWidth - columnStr.length) + columnStr + ")")
-      result.append(" " * (valueFieldWidth - valueStr.length) + valueStr)
-      result.append("\n")
-    }
-
-    result.toString
-  }
-
-  override def equals(obj: Any): Boolean = {
-    obj match {
-      case sm: SparseMatrix if numRows == sm.numRows && numCols == sm.numCols 
=>
-        rowIndices.sameElements(sm.rowIndices) && 
colPtrs.sameElements(sm.colPtrs) &&
-        data.sameElements(sm.data)
-      case _ => false
-    }
-  }
-
-  override def hashCode: Int = {
-    val hashCodes = List(numRows.hashCode(), numCols.hashCode(),
-      java.util.Arrays.hashCode(rowIndices), 
java.util.Arrays.hashCode(colPtrs),
-      java.util.Arrays.hashCode(data))
-
-    hashCodes.foldLeft(5){(left, right) => left * 41 + right}
-  }
-
-  private def locate(row: Int, col: Int): Int = {
-    require(0 <= row && row < numRows && 0 <= col && col < numCols,
-      (row, col) + " not in [0, " + numRows + ") x [0, " + numCols + ")")
-
-    val startIndex = colPtrs(col)
-    val endIndex = colPtrs(col + 1)
-
-    java.util.Arrays.binarySearch(rowIndices, startIndex, endIndex, row)
-  }
-
-  /** Copies the matrix instance
-    *
-    * @return Copy of itself
-    */
-  override def copy: SparseMatrix = {
-    new SparseMatrix(numRows, numCols, rowIndices.clone, colPtrs.clone(), 
data.clone)
-  }
-}
-
-object SparseMatrix{
-
-  /** Constructs a sparse matrix from a coordinate list (COO) representation 
where each entry
-    * is stored as a tuple of (rowIndex, columnIndex, value).
-    * @param numRows
-    * @param numCols
-    * @param entries
-    * @return
-    */
-  def fromCOO(numRows: Int, numCols: Int, entries: (Int, Int, Double)*): 
SparseMatrix = {
-    fromCOO(numRows, numCols, entries)
-  }
-
-  /** Constructs a sparse matrix from a coordinate list (COO) representation 
where each entry
-    * is stored as a tuple of (rowIndex, columnIndex, value).
-    *
-    * @param numRows
-    * @param numCols
-    * @param entries
-    * @return
-    */
-  def fromCOO(numRows: Int, numCols: Int, entries: Iterable[(Int, Int, 
Double)]): SparseMatrix = {
-    val entryArray = entries.toArray
-
-    entryArray.foreach{ case (row, col, _) =>
-      require(0 <= row && row < numRows && 0 <= col && col <= numCols,
-        (row, col) + " not in [0, " + numRows + ") x [0, " + numCols + ")")
-    }
-
-    val COOOrdering = new Ordering[(Int, Int, Double)] {
-      override def compare(x: (Int, Int, Double), y: (Int, Int, Double)): Int 
= {
-        if(x._2 < y._2) {
-          -1
-        } else if(x._2 > y._2) {
-          1
-        } else {
-          x._1 - y._1
-        }
-      }
-    }
-
-    Sorting.quickSort(entryArray)(COOOrdering)
-
-    val nnz = entryArray.length
-
-    val data = new Array[Double](nnz)
-    val rowIndices = new Array[Int](nnz)
-    val colPtrs = new Array[Int](numCols + 1)
-
-    var (lastRow, lastCol, lastValue) = entryArray(0)
-
-    rowIndices(0) = lastRow
-    data(0) = lastValue
-
-    var i = 1
-    var lastDataIndex = 0
-
-    while(i < nnz) {
-      val (curRow, curCol, curValue) = entryArray(i)
-
-      if(lastRow == curRow && lastCol == curCol) {
-        // add values with identical coordinates
-        data(lastDataIndex) += curValue
-      } else {
-        lastDataIndex += 1
-        data(lastDataIndex) = curValue
-        rowIndices(lastDataIndex) = curRow
-        lastRow = curRow
-      }
-
-      while(lastCol < curCol) {
-        lastCol += 1
-        colPtrs(lastCol) = lastDataIndex
-      }
-
-      i += 1
-    }
-
-    lastDataIndex += 1
-    while(lastCol < numCols) {
-      colPtrs(lastCol + 1) = lastDataIndex
-      lastCol += 1
-    }
-
-    val prunedRowIndices = if(lastDataIndex < nnz) {
-      val prunedArray = new Array[Int](lastDataIndex)
-      rowIndices.copyToArray(prunedArray)
-      prunedArray
-    } else {
-      rowIndices
-    }
-
-    val prunedData = if(lastDataIndex < nnz) {
-      val prunedArray = new Array[Double](lastDataIndex)
-      data.copyToArray(prunedArray)
-      prunedArray
-    } else {
-      data
-    }
-
-    new SparseMatrix(numRows, numCols, prunedRowIndices, colPtrs, prunedData)
-  }
-
-  /** Convenience method to convert a single tuple with an integer value into 
a SparseMatrix.
-    * The problem is that providing a single tuple to the fromCOO method, the 
Scala type inference
-    * cannot infer that the tuple has to be of type (Int, Int, Double) because 
of the overloading
-    * with the Iterable type.
-    *
-    * @param numRows
-    * @param numCols
-    * @param entry
-    * @return
-    */
-  def fromCOO(numRows: Int, numCols: Int, entry: (Int, Int, Int)): 
SparseMatrix = {
-    fromCOO(numRows, numCols, (entry._1, entry._2, entry._3.toDouble))
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
deleted file mode 100644
index fec018f..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/SparseVector.scala
+++ /dev/null
@@ -1,285 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.math
-
-import breeze.linalg.{SparseVector => BreezeSparseVector, DenseVector => 
BreezeDenseVector, Vector => BreezeVector}
-
-import scala.util.Sorting
-
-/** Sparse vector implementation storing the data in two arrays. One index 
contains the sorted
-  * indices of the non-zero vector entries and the other the corresponding 
vector entries
-  */
-case class SparseVector(
-    size: Int,
-    indices: Array[Int],
-    data: Array[Double])
-  extends Vector
-  with Serializable {
-
-  /** Updates the element at the given index with the provided value
-    *
-    * @param index Index whose value is updated.
-    * @param value The value used to update the index.
-    */
-  override def update(index: Int, value: Double): Unit = {
-    val resolvedIndex = locate(index)
-
-    if (resolvedIndex < 0) {
-      throw new IllegalArgumentException("Cannot update zero value of sparse 
vector at index " +
-        index)
-    } else {
-      data(resolvedIndex) = value
-    }
-  }
-
-  /** Copies the vector instance
-    *
-    * @return Copy of the vector instance
-    */
-  override def copy: SparseVector = {
-    new SparseVector(size, indices.clone, data.clone)
-  }
-
-  /** Returns the dot product of the recipient and the argument
-    *
-    * @param other a Vector
-    * @return a scalar double of dot product
-    */
-  override def dot(other: Vector): Double = {
-    require(size == other.size, "The size of vector must be equal.")
-    other match {
-      case DenseVector(otherData) =>
-        indices.zipWithIndex.map { case (sparseIdx, idx) => data(idx) * 
otherData(sparseIdx) }.sum
-      case SparseVector(_, otherIndices, otherData) =>
-        var left = 0
-        var right = 0
-        var result = 0.0
-
-        while (left < indices.length && right < otherIndices.length) {
-          if (indices(left) < otherIndices(right)) {
-            left += 1
-          } else if (otherIndices(right) < indices(left)) {
-            right += 1
-          } else {
-            result += data(left) * otherData(right)
-            left += 1
-            right += 1
-          }
-        }
-        result
-    }
-  }
-
-  /** Returns the outer product (a.k.a. Kronecker product) of `this`
-    * with `other`. The result is given in 
[[org.apache.flink.ml.math.SparseMatrix]]
-    * representation.
-    *
-    * @param other a Vector
-    * @return the [[org.apache.flink.ml.math.SparseMatrix]] which equals the 
outer product of `this`
-    *         with `other.`
-    */
-  override def outer(other: Vector): SparseMatrix = {
-    val numRows = size
-    val numCols = other.size
-
-    val entries = other match {
-      case sv: SparseVector =>
-       for {
-          (i, k) <- indices.zipWithIndex
-          (j, l) <- sv.indices.zipWithIndex
-          value = data(k) * sv.data(l)
-          if value != 0
-        } yield (i, j, value)
-      case _ =>
-        for {
-          (i, k) <- indices.zipWithIndex
-          j <- 0 until numCols
-          value = data(k) * other(j)
-          if value != 0
-        } yield (i, j, value)
-    }
-
-    SparseMatrix.fromCOO(numRows, numCols, entries)
-  }
-
-
-  /** Magnitude of a vector
-    *
-    * @return
-    */
-  override def magnitude: Double = math.sqrt(data.map(x => x * x).sum)
-
-  /** Element wise access function
-    *
-    * * @param index index of the accessed element
-    * @return element with index
-    */
-  override def apply(index: Int): Double = {
-    val resolvedIndex = locate(index)
-
-    if(resolvedIndex < 0) {
-      0
-    } else {
-      data(resolvedIndex)
-    }
-  }
-
-  def toDenseVector: DenseVector = {
-    val denseVector = DenseVector.zeros(size)
-
-    for(index <- 0 until size) {
-      denseVector(index) = this(index)
-    }
-
-    denseVector
-  }
-
-  override def equals(obj: Any): Boolean = {
-    obj match {
-      case sv: SparseVector if size == sv.size =>
-        indices.sameElements(sv.indices) && data.sameElements(sv.data)
-      case _ => false
-    }
-  }
-
-  override def hashCode: Int = {
-    val hashCodes = List(size.hashCode, java.util.Arrays.hashCode(indices),
-      java.util.Arrays.hashCode(data))
-
-    hashCodes.foldLeft(3){ (left, right) => left * 41 + right}
-  }
-
-  override def toString: String = {
-    val entries = indices.zip(data).mkString(", ")
-    "SparseVector(" + entries + ")"
-  }
-
-  private def locate(index: Int): Int = {
-    require(0 <= index && index < size, index + " not in [0, " + size + ")")
-
-    java.util.Arrays.binarySearch(indices, 0, indices.length, index)
-  }
-}
-
-object SparseVector {
-
-  /** Constructs a sparse vector from a coordinate list (COO) representation 
where each entry
-    * is stored as a tuple of (index, value).
-    *
-    * @param size
-    * @param entries
-    * @return
-    */
-  def fromCOO(size: Int, entries: (Int, Double)*): SparseVector = {
-    fromCOO(size, entries)
-  }
-
-  /** Constructs a sparse vector from a coordinate list (COO) representation 
where each entry
-    * is stored as a tuple of (index, value).
-    *
-    * @param size
-    * @param entries
-    * @return
-    */
-  def fromCOO(size: Int, entries: Iterable[(Int, Double)]): SparseVector = {
-    val entryArray = entries.toArray
-
-    entryArray.foreach { case (index, _) =>
-      require(0 <= index && index < size, index + " not in [0, " + size + ")")
-    }
-
-    val COOOrdering = new Ordering[(Int, Double)] {
-      override def compare(x: (Int, Double), y: (Int, Double)): Int = {
-        x._1 - y._1
-      }
-    }
-
-    Sorting.quickSort(entryArray)(COOOrdering)
-
-    // calculate size of the array
-    val arraySize = entryArray.foldLeft((-1, 0)){ case ((lastIndex, numRows), 
(index, _)) =>
-      if(lastIndex == index) {
-        (lastIndex, numRows)
-      } else {
-        (index, numRows + 1)
-      }
-    }._2
-
-    val indices = new Array[Int](arraySize)
-    val data = new Array[Double](arraySize)
-
-    val (index, value) = entryArray(0)
-
-    indices(0) = index
-    data(0) = value
-
-    var i = 1
-    var lastIndex = indices(0)
-    var lastDataIndex = 0
-
-    while(i < entryArray.length) {
-      val (curIndex, curValue) = entryArray(i)
-
-      if(curIndex == lastIndex) {
-        data(lastDataIndex) += curValue
-      } else {
-        lastDataIndex += 1
-        data(lastDataIndex) = curValue
-        indices(lastDataIndex) = curIndex
-        lastIndex = curIndex
-      }
-
-      i += 1
-    }
-
-    new SparseVector(size, indices, data)
-  }
-
-  /** Convenience method to be able to instantiate a SparseVector with a 
single element. The Scala
-    * type inference mechanism cannot infer that the second tuple value has to 
be of type Double
-    * if only a single tuple is provided.
-    *
-    * @param size
-    * @param entry
-    * @return
-    */
-  def fromCOO(size: Int, entry: (Int, Int)): SparseVector = {
-    fromCOO(size, (entry._1, entry._2.toDouble))
-  }
-
-  /** BreezeVectorConverter implementation for 
[[org.apache.flink.ml.math.SparseVector]]
-    *
-    * This allows to convert Breeze vectors into [[SparseVector]]
-    */
-  implicit val sparseVectorConverter = new BreezeVectorConverter[SparseVector] 
{
-    override def convert(vector: BreezeVector[Double]): SparseVector = {
-      vector match {
-        case dense: BreezeDenseVector[Double] =>
-          SparseVector.fromCOO(
-            dense.length,
-            dense.iterator.toIterable)
-        case sparse: BreezeSparseVector[Double] =>
-          new SparseVector(
-            sparse.length,
-            sparse.index.take(sparse.used),
-            sparse.data.take(sparse.used))
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
deleted file mode 100644
index e52328d..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/Vector.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.math
-
-import breeze.linalg.{SparseVector => BreezeSparseVector, DenseVector => 
BreezeDenseVector, Vector => BreezeVector}
-
-/** Base trait for Vectors
-  *
-  */
-trait Vector extends Serializable {
-
-  /** Number of elements in a vector
-    *
-    * @return
-    */
-  def size: Int
-
-  /** Element wise access function
-    *
-    * * @param index index of the accessed element
-    * @return element with index
-    */
-  def apply(index: Int): Double
-
-  /** Updates the element at the given index with the provided value
-    *
-    * @param index
-    * @param value
-    */
-  def update(index: Int, value: Double): Unit
-
-  /** Copies the vector instance
-    *
-    * @return Copy of the vector instance
-    */
-  def copy: Vector
-
-  /** Returns the dot product of the recipient and the argument
-    *
-    * @param other a Vector
-    * @return a scalar double of dot product
-    */
-  def dot(other: Vector): Double
-
-  /** Returns the outer product of the recipient and the argument
-    *
-    *
-    * @param other a Vector
-    * @return a matrix
-    */
-  def outer(other: Vector): Matrix
-
-  /** Magnitude of a vector
-    *
-    * @return
-    */
-  def magnitude: Double
-
-  def equalsVector(vector: Vector): Boolean = {
-    if(size == vector.size) {
-      (0 until size) forall { idx =>
-        this(idx) == vector(idx)
-      }
-    } else {
-      false
-    }
-  }
-}
-
-object Vector{
-  /** BreezeVectorConverter implementation for [[Vector]]
-    *
-    * This allows to convert Breeze vectors into [[Vector]].
-    */
-  implicit val vectorConverter = new BreezeVectorConverter[Vector] {
-    override def convert(vector: BreezeVector[Double]): Vector = {
-      vector match {
-        case dense: BreezeDenseVector[Double] => new DenseVector(dense.data)
-
-        case sparse: BreezeSparseVector[Double] =>
-          new SparseVector(
-            sparse.length,
-            sparse.index.take(sparse.used),
-            sparse.data.take(sparse.used))
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala
deleted file mode 100644
index 3bbf146..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/VectorBuilder.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.math
-
-/** Type class to allow the vector construction from different data types
-  *
-  * @tparam T Subtype of [[Vector]]
-  */
-trait VectorBuilder[T <: Vector] extends Serializable {
-  /** Builds a [[Vector]] of type T from a List[Double]
-    *
-    * @param data Input data where the index denotes the resulting index of 
the vector
-    * @return A vector of type T
-    */
-  def build(data: List[Double]): T
-}
-
-object VectorBuilder{
-
-  /** Type class implementation for [[org.apache.flink.ml.math.DenseVector]] */
-  implicit val denseVectorBuilder = new VectorBuilder[DenseVector] {
-    override def build(data: List[Double]): DenseVector = {
-      new DenseVector(data.toArray)
-    }
-  }
-
-  /** Type class implementation for [[org.apache.flink.ml.math.SparseVector]] 
*/
-  implicit val sparseVectorBuilder = new VectorBuilder[SparseVector] {
-    override def build(data: List[Double]): SparseVector = {
-      // Enrich elements with explicit indices and filter out zero entries
-      SparseVector.fromCOO(data.length, data.indices.zip(data).filter(_._2 != 
0.0))
-    }
-  }
-
-  /** Type class implementation for [[Vector]] */
-  implicit val vectorBuilder = new VectorBuilder[Vector] {
-    override def build(data: List[Double]): Vector = {
-      new DenseVector(data.toArray)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
deleted file mode 100644
index 4c7f254..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/math/package.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml
-
-/**
- * Convenience methods to handle Flink's [[org.apache.flink.ml.math.Matrix]] 
and [[Vector]]
- * abstraction.
- */
-package object math {
-  implicit class RichMatrix(matrix: Matrix) extends Iterable[(Int, Int, 
Double)] {
-
-    override def iterator: Iterator[(Int, Int, Double)] = {
-      new Iterator[(Int, Int, Double)] {
-        var index = 0
-
-        override def hasNext: Boolean = {
-          index < matrix.numRows * matrix.numCols
-        }
-
-        override def next(): (Int, Int, Double) = {
-          val row = index % matrix.numRows
-          val column = index / matrix.numRows
-
-          index += 1
-
-          (row, column, matrix(row, column))
-        }
-      }
-    }
-
-    def valueIterator: Iterator[Double] = {
-      val it = iterator
-
-      new Iterator[Double] {
-        override def hasNext: Boolean = it.hasNext
-
-        override def next(): Double = it.next._3
-      }
-    }
-
-  }
-
-  implicit class RichVector(vector: Vector) extends Iterable[(Int, Double)] {
-
-    override def iterator: Iterator[(Int, Double)] = {
-      new Iterator[(Int, Double)] {
-        var index = 0
-
-        override def hasNext: Boolean = {
-          index < vector.size
-        }
-
-        override def next(): (Int, Double) = {
-          val resultIndex = index
-
-          index += 1
-
-          (resultIndex, vector(resultIndex))
-        }
-      }
-    }
-
-    def valueIterator: Iterator[Double] = {
-      val it = iterator
-
-      new Iterator[Double] {
-        override def hasNext: Boolean = it.hasNext
-
-        override def next(): Double = it.next._2
-      }
-    }
-  }
-
-  /** Stores the vector values in a dense array
-    *
-    * @param vector
-    * @return Array containing the vector values
-    */
-  def vector2Array(vector: Vector): Array[Double] = {
-    vector match {
-      case dense: DenseVector => dense.data.clone
-
-      case sparse: SparseVector =>
-        val result = new Array[Double](sparse.size)
-
-        for((index, value) <- sparse) {
-          result(index) = value
-        }
-
-        result
-
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/ChebyshevDistanceMetric.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/ChebyshevDistanceMetric.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/ChebyshevDistanceMetric.scala
deleted file mode 100644
index 055ede3..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/ChebyshevDistanceMetric.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.metrics.distances
-
-import org.apache.flink.ml.math.Vector
-
-/** This class implements a Chebyshev distance metric. The class calculates 
the distance between
-  * the given vectors by finding the maximum difference between each 
coordinate.
-  *
-  * @see http://en.wikipedia.org/wiki/Chebyshev_distance
-  */
-class ChebyshevDistanceMetric extends DistanceMetric {
-  override def distance(a: Vector, b: Vector): Double = {
-    checkValidArguments(a, b)
-    (0 until a.size).map(i => math.abs(a(i) - b(i))).max
-  }
-}
-
-object ChebyshevDistanceMetric {
-  def apply() = new ChebyshevDistanceMetric()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/CosineDistanceMetric.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/CosineDistanceMetric.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/CosineDistanceMetric.scala
deleted file mode 100644
index f32ea26..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/CosineDistanceMetric.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.metrics.distances
-
-import org.apache.flink.ml.math.Vector
-
-/** This class implements a cosine distance metric. The class calculates the 
distance between
-  * the given vectors by dividing the dot product of two vectors by the 
product of their lengths.
-  * We convert the result of division to a usable distance. So, 1 - cos(angle) 
is actually returned.
-  *
-  * @see http://en.wikipedia.org/wiki/Cosine_similarity
-  */
-class CosineDistanceMetric extends DistanceMetric {
-  override def distance(a: Vector, b: Vector): Double = {
-    checkValidArguments(a, b)
-
-    val dotProd: Double = a.dot(b)
-    val denominator: Double = a.magnitude * b.magnitude
-    if (dotProd == 0 && denominator == 0) {
-      0
-    } else {
-      1 - dotProd / denominator
-    }
-  }
-}
-
-object CosineDistanceMetric {
-  def apply() = new CosineDistanceMetric()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/DistanceMetric.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/DistanceMetric.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/DistanceMetric.scala
deleted file mode 100644
index 21573fe..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/DistanceMetric.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.metrics.distances
-
-import org.apache.flink.ml.math.Vector
-
-/** DistanceMeasure interface is used for object which determines distance 
between two points.
-  */
-trait DistanceMetric extends Serializable {
-  /** Returns the distance between the arguments.
-    *
-    * @param a a Vector defining a multi-dimensional point in some space
-    * @param b a Vector defining a multi-dimensional point in some space
-    * @return a scalar double of the distance
-    */
-  def distance(a: Vector, b: Vector): Double
-
-  protected def checkValidArguments(a: Vector, b: Vector) = {
-    require(a.size == b.size, "The each size of vectors must be same to 
calculate distance.")
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/EuclideanDistanceMetric.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/EuclideanDistanceMetric.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/EuclideanDistanceMetric.scala
deleted file mode 100644
index 153fb93..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/EuclideanDistanceMetric.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.metrics.distances
-
-import org.apache.flink.ml.math.Vector
-
-/** This class implements a Euclidean distance metric. The metric calculates 
the distance between
-  * the given two vectors by summing the square root of the squared 
differences between
-  * each coordinate.
-  *
-  * http://en.wikipedia.org/wiki/Euclidean_distance
-  *
-  * If you don't care about the true distance and only need for comparison,
-  * [[SquaredEuclideanDistanceMetric]] will be faster because it doesn't 
calculate the actual
-  * square root of the distances.
-  *
-  * @see http://en.wikipedia.org/wiki/Euclidean_distance
-  */
-class EuclideanDistanceMetric extends SquaredEuclideanDistanceMetric {
-  override def distance(a: Vector, b: Vector): Double = 
math.sqrt(super.distance(a, b))
-}
-
-object EuclideanDistanceMetric {
-  def apply() = new EuclideanDistanceMetric()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/ManhattanDistanceMetric.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/ManhattanDistanceMetric.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/ManhattanDistanceMetric.scala
deleted file mode 100644
index 5983f79..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/ManhattanDistanceMetric.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.metrics.distances
-
-import org.apache.flink.ml.math.Vector
-
-/** This class implements a Manhattan distance metric. The class calculates 
the distance between
-  * the given vectors by summing the differences between each coordinate.
-  *
-  * @see http://en.wikipedia.org/wiki/Taxicab_geometry
-  */
-class ManhattanDistanceMetric extends DistanceMetric{
-  override def distance(a: Vector, b: Vector): Double = {
-    checkValidArguments(a, b)
-    (0 until a.size).map(i => math.abs(a(i) - b(i))).sum
-  }
-}
-
-object ManhattanDistanceMetric {
-  def apply() = new ManhattanDistanceMetric()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/MinkowskiDistanceMetric.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/MinkowskiDistanceMetric.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/MinkowskiDistanceMetric.scala
deleted file mode 100644
index 50161d4..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/MinkowskiDistanceMetric.scala
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.metrics.distances
-
-import org.apache.flink.ml.math.Vector
-
-/** This class implements a Minkowski distance metric. The metric is a 
generalization of
-  * L(p) distances: Euclidean distance and Manhattan distance. If you need for 
a special case of
-  * p = 1 or p = 2, use [[ManhattanDistanceMetric]], 
[[EuclideanDistanceMetric]]. This class is
-  * useful for high exponents.
-  *
-  * @param p the norm exponent of space
-  *
-  * @see http://en.wikipedia.org/wiki/Minkowski_distance
-  */
-class MinkowskiDistanceMetric(val p: Double) extends DistanceMetric {
-  override def distance(a: Vector, b: Vector): Double = {
-    checkValidArguments(a, b)
-    math.pow((0 until a.size).map(i => math.pow(math.abs(a(i) - b(i)), 
p)).sum, 1 / p)
-  }
-}
-
-object MinkowskiDistanceMetric {
-  def apply(p: Double) = new MinkowskiDistanceMetric(p)
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/SquaredEuclideanDistanceMetric.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/SquaredEuclideanDistanceMetric.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/SquaredEuclideanDistanceMetric.scala
deleted file mode 100644
index fe546e9..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/SquaredEuclideanDistanceMetric.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.metrics.distances
-
-import org.apache.flink.ml.math.Vector
-
-/** This class is like [[EuclideanDistanceMetric]] but it does not take the 
square root.
-  *
-  * The value calculated by this class is not exact Euclidean distance, but it 
saves on computation
-  * when you need the value for only comparison.
-  */
-class SquaredEuclideanDistanceMetric extends DistanceMetric {
-  override def distance(a: Vector, b: Vector): Double = {
-    checkValidArguments(a, b)
-    (0 until a.size).map(i => math.pow(a(i) - b(i), 2)).sum
-  }
-}
-
-object SquaredEuclideanDistanceMetric {
-  def apply() = new SquaredEuclideanDistanceMetric()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/TanimotoDistanceMetric.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/TanimotoDistanceMetric.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/TanimotoDistanceMetric.scala
deleted file mode 100644
index 5141c98..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/metrics/distances/TanimotoDistanceMetric.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.metrics.distances
-
-import org.apache.flink.ml.math.Vector
-
-/** This class implements a Tanimoto distance metric. The class calculates the 
distance between
-  * the given vectors. The vectors are assumed as bit-wise vectors. We convert 
the result of
-  * division to a usable distance. So, 1 - similarity is actually returned.
-  *
-  * @see http://en.wikipedia.org/wiki/Jaccard_index
-  */
-class TanimotoDistanceMetric extends DistanceMetric {
-  override def distance(a: Vector, b: Vector): Double = {
-    checkValidArguments(a, b)
-
-    val dotProd: Double = a.dot(b)
-    1 - dotProd / (a.magnitude * a.magnitude + b.magnitude * b.magnitude - 
dotProd)
-  }
-}
-
-object TanimotoDistanceMetric {
-  def apply() = new TanimotoDistanceMetric()
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
deleted file mode 100644
index 78bad70..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
+++ /dev/null
@@ -1,350 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.ml.optimization
-
-import org.apache.flink.api.scala._
-import org.apache.flink.ml.common._
-import org.apache.flink.ml.math._
-import org.apache.flink.ml.optimization.IterativeSolver.{ConvergenceThreshold, 
Iterations, LearningRate}
-import org.apache.flink.ml.optimization.Solver._
-import org.apache.flink.ml._
-
-/** Base class which performs Stochastic Gradient Descent optimization using 
mini batches.
-  *
-  * For each labeled vector in a mini batch the gradient is computed and added 
to a partial
-  * gradient. The partial gradients are then summed and divided by the size of 
the batches. The
-  * average gradient is then used to updated the weight values, including 
regularization.
-  *
-  * At the moment, the whole partition is used for SGD, making it effectively 
a batch gradient
-  * descent. Once a sampling operator has been introduced, the algorithm can 
be optimized
-  *
-  *  The parameters to tune the algorithm are:
-  *                      [[Solver.LossFunction]] for the loss function to be 
used,
-  *                      [[Solver.RegularizationConstant]] for the 
regularization parameter,
-  *                      [[IterativeSolver.Iterations]] for the maximum number 
of iteration,
-  *                      [[IterativeSolver.LearningRate]] for the learning 
rate used.
-  *                      [[IterativeSolver.ConvergenceThreshold]] when 
provided the algorithm will
-  *                      stop the iterations if the relative change in the 
value of the objective
-  *                      function between successive iterations is is smaller 
than this value.
-  */
-abstract class GradientDescent extends IterativeSolver {
-
-  /** Provides a solution for the given optimization problem
-    *
-    * @param data A Dataset of LabeledVector (label, features) pairs
-    * @param initialWeights The initial weights that will be optimized
-    * @return The weights, optimized for the provided data.
-    */
-  override def optimize(
-    data: DataSet[LabeledVector],
-    initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-
-    val numberOfIterations: Int = parameters(Iterations)
-    val convergenceThresholdOption: Option[Double] = 
parameters.get(ConvergenceThreshold)
-    val lossFunction = parameters(LossFunction)
-    val learningRate = parameters(LearningRate)
-    val regularizationConstant = parameters(RegularizationConstant)
-
-    // Initialize weights
-    val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
-
-    // Perform the iterations
-    convergenceThresholdOption match {
-      // No convergence criterion
-      case None =>
-        optimizeWithoutConvergenceCriterion(
-          data,
-          initialWeightsDS,
-          numberOfIterations,
-          regularizationConstant,
-          learningRate,
-          lossFunction)
-      case Some(convergence) =>
-        optimizeWithConvergenceCriterion(
-          data,
-          initialWeightsDS,
-          numberOfIterations,
-          regularizationConstant,
-          learningRate,
-          convergence,
-          lossFunction
-        )
-    }
-  }
-
-  def optimizeWithConvergenceCriterion(
-      dataPoints: DataSet[LabeledVector],
-      initialWeightsDS: DataSet[WeightVector],
-      numberOfIterations: Int,
-      regularizationConstant: Double,
-      learningRate: Double,
-      convergenceThreshold: Double,
-      lossFunction: LossFunction)
-    : DataSet[WeightVector] = {
-    // We have to calculate for each weight vector the sum of squared 
residuals,
-    // and then sum them and apply regularization
-    val initialLossSumDS = calculateLoss(dataPoints, initialWeightsDS, 
lossFunction)
-
-    // Combine weight vector with the current loss
-    val initialWeightsWithLossSum = 
initialWeightsDS.mapWithBcVariable(initialLossSumDS){
-      (weights, loss) => (weights, loss)
-    }
-
-    val resultWithLoss = 
initialWeightsWithLossSum.iterateWithTermination(numberOfIterations) {
-      weightsWithPreviousLossSum =>
-
-        // Extract weight vector and loss
-        val previousWeightsDS = weightsWithPreviousLossSum.map{_._1}
-        val previousLossSumDS = weightsWithPreviousLossSum.map{_._2}
-
-        val currentWeightsDS = SGDStep(
-          dataPoints,
-          previousWeightsDS,
-          lossFunction,
-          regularizationConstant,
-          learningRate)
-
-        val currentLossSumDS = calculateLoss(dataPoints, currentWeightsDS, 
lossFunction)
-
-        // Check if the relative change in the loss is smaller than the
-        // convergence threshold. If yes, then terminate i.e. return empty 
termination data set
-        val termination = 
previousLossSumDS.filterWithBcVariable(currentLossSumDS){
-          (previousLoss, currentLoss) => {
-            if (previousLoss <= 0) {
-              false
-            } else {
-              scala.math.abs((previousLoss - currentLoss)/previousLoss) >= 
convergenceThreshold
-            }
-          }
-        }
-
-        // Result for new iteration
-        (currentWeightsDS.mapWithBcVariable(currentLossSumDS)((w, l) => (w, 
l)), termination)
-    }
-    // Return just the weights
-    resultWithLoss.map{_._1}
-  }
-
-  def optimizeWithoutConvergenceCriterion(
-      data: DataSet[LabeledVector],
-      initialWeightsDS: DataSet[WeightVector],
-      numberOfIterations: Int,
-      regularizationConstant: Double,
-      learningRate: Double,
-      lossFunction: LossFunction)
-    : DataSet[WeightVector] = {
-    initialWeightsDS.iterate(numberOfIterations) {
-      weightVectorDS => {
-        SGDStep(data, weightVectorDS, lossFunction, regularizationConstant, 
learningRate)
-      }
-    }
-  }
-
-  /** Performs one iteration of Stochastic Gradient Descent using mini batches
-    *
-    * @param data A Dataset of LabeledVector (label, features) pairs
-    * @param currentWeights A Dataset with the current weights to be optimized 
as its only element
-    * @return A Dataset containing the weights after one stochastic gradient 
descent step
-    */
-  private def SGDStep(
-    data: DataSet[(LabeledVector)],
-    currentWeights: DataSet[WeightVector],
-    lossFunction: LossFunction,
-    regularizationConstant: Double,
-    learningRate: Double)
-  : DataSet[WeightVector] = {
-
-    data.mapWithBcVariable(currentWeights){
-      (data, weightVector) => (lossFunction.gradient(data, weightVector), 1)
-    }.reduce{
-      (left, right) =>
-        val (leftGradVector, leftCount) = left
-        val (rightGradVector, rightCount) = right
-        // Add the left gradient to the right one
-        BLAS.axpy(1.0, leftGradVector.weights, rightGradVector.weights)
-        val gradients = WeightVector(
-          rightGradVector.weights, leftGradVector.intercept + 
rightGradVector.intercept)
-
-        (gradients , leftCount + rightCount)
-    }.mapWithBcVariableIteration(currentWeights){
-      (gradientCount, weightVector, iteration) => {
-        val (WeightVector(weights, intercept), count) = gradientCount
-
-        BLAS.scal(1.0/count, weights)
-
-        val gradient = WeightVector(weights, intercept/count)
-
-        val effectiveLearningRate = learningRate/Math.sqrt(iteration)
-
-        val newWeights = takeStep(
-          weightVector.weights,
-          gradient.weights,
-          regularizationConstant,
-          effectiveLearningRate)
-
-        WeightVector(
-          newWeights,
-          weightVector.intercept - effectiveLearningRate * gradient.intercept)
-      }
-    }
-  }
-
-  /** Calculates the new weights based on the gradient
-    *
-    * @param weightVector
-    * @param gradient
-    * @param regularizationConstant
-    * @param learningRate
-    * @return
-    */
-  def takeStep(
-    weightVector: Vector,
-    gradient: Vector,
-    regularizationConstant: Double,
-    learningRate: Double
-    ): Vector
-
-  /** Calculates the regularized loss, from the data and given weights.
-    *
-    * @param data
-    * @param weightDS
-    * @param lossFunction
-    * @return
-    */
-  private def calculateLoss(
-      data: DataSet[LabeledVector],
-      weightDS: DataSet[WeightVector],
-      lossFunction: LossFunction)
-    : DataSet[Double] = {
-    data.mapWithBcVariable(weightDS){
-      (data, weightVector) => (lossFunction.loss(data, weightVector), 1)
-    }.reduce{
-      (left, right) => (left._1 + right._1, left._2 + right._2)
-    }.map {
-      lossCount => lossCount._1 / lossCount._2
-    }
-  }
-}
-
-/** Implementation of a SGD solver with L2 regularization.
-  *
-  * The regularization function is `1/2 ||w||_2^2` with `w` being the weight 
vector.
-  */
-class GradientDescentL2 extends GradientDescent {
-
-  /** Calculates the new weights based on the gradient
-    *
-    * @param weightVector
-    * @param gradient
-    * @param regularizationConstant
-    * @param learningRate
-    * @return
-    */
-  override def takeStep(
-      weightVector: Vector,
-      gradient: Vector,
-      regularizationConstant: Double,
-      learningRate: Double)
-    : Vector = {
-    // add the gradient of the L2 regularization
-    BLAS.axpy(regularizationConstant, weightVector, gradient)
-
-    // update the weights according to the learning rate
-    BLAS.axpy(-learningRate, gradient, weightVector)
-
-    weightVector
-  }
-}
-
-object GradientDescentL2 {
-  def apply() = new GradientDescentL2
-}
-
-/** Implementation of a SGD solver with L1 regularization.
-  *
-  * The regularization function is `||w||_1` with `w` being the weight vector.
-  */
-class GradientDescentL1 extends GradientDescent {
-
-  /** Calculates the new weights based on the gradient.
-    *
-    * @param weightVector
-    * @param gradient
-    * @param regularizationConstant
-    * @param learningRate
-    * @return
-    */
-  override def takeStep(
-      weightVector: Vector,
-      gradient: Vector,
-      regularizationConstant: Double,
-      learningRate: Double)
-    : Vector = {
-    // Update weight vector with gradient. L1 regularization has no gradient, 
the proximal operator
-    // does the job.
-    BLAS.axpy(-learningRate, gradient, weightVector)
-
-    // Apply proximal operator (soft thresholding)
-    val shrinkageVal = regularizationConstant * learningRate
-    var i = 0
-    while (i < weightVector.size) {
-      val wi = weightVector(i)
-      weightVector(i) = scala.math.signum(wi) *
-        scala.math.max(0.0, scala.math.abs(wi) - shrinkageVal)
-      i += 1
-    }
-
-    weightVector
-  }
-}
-
-object GradientDescentL1 {
-  def apply() = new GradientDescentL1
-}
-
-/** Implementation of a SGD solver without regularization.
-  *
-  * No regularization is applied.
-  */
-class SimpleGradientDescent extends GradientDescent {
-
-  /** Calculates the new weights based on the gradient.
-    *
-    * @param weightVector
-    * @param gradient
-    * @param regularizationConstant
-    * @param learningRate
-    * @return
-    */
-  override def takeStep(
-      weightVector: Vector,
-      gradient: Vector,
-      regularizationConstant: Double,
-      learningRate: Double)
-    : Vector = {
-    // Update the weight vector
-    BLAS.axpy(-learningRate, gradient, weightVector)
-    weightVector
-  }
-}
-
-object SimpleGradientDescent{
-  def apply() = new SimpleGradientDescent
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala
deleted file mode 100644
index 1ff5d97..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/LossFunction.scala
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.optimization
-
-import org.apache.flink.ml.common.{WeightVector, LabeledVector}
-import org.apache.flink.ml.math.BLAS
-
-/** Abstract class that implements some of the functionality for common loss 
functions
-  *
-  * A loss function determines the loss term $L(w) of the objective function  
$f(w) = L(w) +
-  * \lambda R(w)$ for prediction tasks, the other being regularization, $R(w)$.
-  *
-  * The regularization is specific to the used optimization algorithm and, 
thus, implemented there.
-  *
-  * We currently only support differentiable loss functions, in the future 
this class
-  * could be changed to DiffLossFunction in order to support other types, such 
as absolute loss.
-  */
-trait LossFunction extends Serializable {
-
-  /** Calculates the loss given the prediction and label value
-    *
-    * @param dataPoint
-    * @param weightVector
-    * @return
-    */
-  def loss(dataPoint: LabeledVector, weightVector: WeightVector): Double = {
-    lossGradient(dataPoint, weightVector)._1
-  }
-
-  /** Calculates the gradient of the loss function given a data point and 
weight vector
-    *
-    * @param dataPoint
-    * @param weightVector
-    * @return
-    */
-  def gradient(dataPoint: LabeledVector, weightVector: WeightVector): 
WeightVector = {
-    lossGradient(dataPoint, weightVector)._2
-  }
-
-  /** Calculates the gradient as well as the loss given a data point and the 
weight vector
-    *
-    * @param dataPoint
-    * @param weightVector
-    * @return
-    */
-  def lossGradient(dataPoint: LabeledVector, weightVector: WeightVector): 
(Double, WeightVector)
-}
-
-/** Generic loss function which lets you build a loss function out of the 
[[PartialLossFunction]]
-  * and the [[PredictionFunction]].
-  *
-  * @param partialLossFunction
-  * @param predictionFunction
-  */
-case class GenericLossFunction(
-    partialLossFunction: PartialLossFunction,
-    predictionFunction: PredictionFunction)
-  extends LossFunction {
-
-  /** Calculates the gradient as well as the loss given a data point and the 
weight vector
-    *
-    * @param dataPoint
-    * @param weightVector
-    * @return
-    */
-  def lossGradient(dataPoint: LabeledVector, weightVector: WeightVector): 
(Double, WeightVector) = {
-    val prediction = predictionFunction.predict(dataPoint.vector, weightVector)
-
-    val loss = partialLossFunction.loss(prediction, dataPoint.label)
-
-    val lossDerivative = partialLossFunction.derivative(prediction, 
dataPoint.label)
-
-    val WeightVector(weightGradient, interceptGradient) =
-      predictionFunction.gradient(dataPoint.vector, weightVector)
-
-    BLAS.scal(lossDerivative, weightGradient)
-
-    (loss, WeightVector(weightGradient, lossDerivative * interceptGradient))
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala
deleted file mode 100644
index 5cf69b6..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PartialLossFunction.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.optimization
-
-/** Represents loss functions which can be used with the 
[[GenericLossFunction]].
-  *
-  */
-trait PartialLossFunction extends Serializable {
-  /** Calculates the loss depending on the label and the prediction
-    *
-    * @param prediction
-    * @param label
-    * @return
-    */
-  def loss(prediction: Double, label: Double): Double
-
-  /** Calculates the derivative of the [[PartialLossFunction]]
-    * 
-    * @param prediction
-    * @param label
-    * @return
-    */
-  def derivative(prediction: Double, label: Double): Double
-}
-
-/** Squared loss function which can be used with the [[GenericLossFunction]]
-  *
-  * The [[SquaredLoss]] function implements `1/2 (prediction - label)^2`
-  */
-object SquaredLoss extends PartialLossFunction {
-
-  /** Calculates the loss depending on the label and the prediction
-    *
-    * @param prediction
-    * @param label
-    * @return
-    */
-  override def loss(prediction: Double, label: Double): Double = {
-    0.5 * (prediction - label) * (prediction - label)
-  }
-
-  /** Calculates the derivative of the [[PartialLossFunction]]
-    *
-    * @param prediction
-    * @param label
-    * @return
-    */
-  override def derivative(prediction: Double, label: Double): Double = {
-    (prediction - label)
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PredictionFunction.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PredictionFunction.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PredictionFunction.scala
deleted file mode 100644
index 38f340a..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/PredictionFunction.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.optimization
-
-import org.apache.flink.ml.common.WeightVector
-import org.apache.flink.ml.math.{Vector => FlinkVector, BLAS}
-
-/** An abstract class for prediction functions to be used in optimization **/
-abstract class PredictionFunction extends Serializable {
-  def predict(features: FlinkVector, weights: WeightVector): Double
-
-  def gradient(features: FlinkVector, weights: WeightVector): WeightVector
-}
-
-/** A linear prediction function **/
-object LinearPrediction extends PredictionFunction {
-  override def predict(features: FlinkVector, weightVector: WeightVector): 
Double = {
-    BLAS.dot(features, weightVector.weights) + weightVector.intercept
-  }
-
-  override def gradient(features: FlinkVector, weights: WeightVector): 
WeightVector = {
-    WeightVector(features.copy, 1)
-  }
-}

Reply via email to