http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
deleted file mode 100644
index 39a031f..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.optimization
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common._
-import org.apache.flink.ml.math.{SparseVector, DenseVector}
-import org.apache.flink.api.scala._
-import org.apache.flink.ml.optimization.IterativeSolver._
-
-/** Base class for optimization algorithms
- *
- */
-abstract class Solver extends Serializable with WithParameters {
-  import Solver._
-
-  /** Provides a solution for the given optimization problem
-    *
-    * @param data A Dataset of LabeledVector (input, output) pairs
-    * @param initialWeights The initial weight that will be optimized
-    * @return A Vector of weights optimized to the given problem
-    */
-  def optimize(
-      data: DataSet[LabeledVector],
-      initialWeights: Option[DataSet[WeightVector]])
-    : DataSet[WeightVector]
-
-  /** Creates initial weights vector, creating a DataSet with a WeightVector 
element
-    *
-    * @param initialWeights An Option that may contain an initial set of 
weights
-    * @param data The data for which we optimize the weights
-    * @return A DataSet containing a single WeightVector element
-    */
-  def createInitialWeightsDS(initialWeights: Option[DataSet[WeightVector]],
-                             data: DataSet[LabeledVector]): 
DataSet[WeightVector] = {
-    // TODO: Faster way to do this?
-    val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b)
-
-    initialWeights match {
-      // Ensure provided weight vector is a DenseVector
-      case Some(wvDS) =>
-        wvDS.map {
-          wv => {
-            val denseWeights = wv.weights match {
-              case dv: DenseVector => dv
-              case sv: SparseVector => sv.toDenseVector
-          }
-          WeightVector(denseWeights, wv.intercept)
-          }
-        }
-      case None => createInitialWeightVector(dimensionsDS)
-    }
-  }
-
-  /** Creates a DataSet with one zero vector. The zero vector has dimension d, 
which is given
-    * by the dimensionDS.
-    *
-    * @param dimensionDS DataSet with one element d, denoting the dimension of 
the returned zero
-    *                    vector
-    * @return DataSet of a zero vector of dimension d
-    */
-  def createInitialWeightVector(dimensionDS: DataSet[Int]): 
DataSet[WeightVector] = {
-    dimensionDS.map {
-      dimension =>
-        val values = Array.fill(dimension)(0.0)
-        new WeightVector(DenseVector(values), 0.0)
-    }
-  }
-
-  //Setters for parameters
-  // TODO(tvas): Provide an option to fit an intercept or not
-  def setLossFunction(lossFunction: LossFunction): this.type = {
-    parameters.add(LossFunction, lossFunction)
-    this
-  }
-
-  def setRegularizationConstant(regularizationConstant: Double): this.type = {
-    parameters.add(RegularizationConstant, regularizationConstant)
-    this
-  }
-}
-
-object Solver {
-  // Define parameters for Solver
-  case object LossFunction extends Parameter[LossFunction] {
-    // TODO(tvas): Should depend on problem, here is where differentiating 
between classification
-    // and regression could become useful
-    val defaultValue = None
-  }
-
-  case object RegularizationConstant extends Parameter[Double] {
-    val defaultValue = Some(0.0) // TODO(tvas): Properly initialize this, 
ensure Parameter > 0!
-  }
-}
-
-/** An abstract class for iterative optimization algorithms
-  *
-  * See [[https://en.wikipedia.org/wiki/Iterative_method Iterative Methods on 
Wikipedia]] for more
-  * info
-  */
-abstract class IterativeSolver() extends Solver {
-
-  //Setters for parameters
-  def setIterations(iterations: Int): this.type = {
-    parameters.add(Iterations, iterations)
-    this
-  }
-
-  def setStepsize(stepsize: Double): this.type = {
-    parameters.add(LearningRate, stepsize)
-    this
-  }
-
-  def setConvergenceThreshold(convergenceThreshold: Double): this.type = {
-    parameters.add(ConvergenceThreshold, convergenceThreshold)
-    this
-  }
-}
-
-object IterativeSolver {
-
-  val MAX_DLOSS: Double = 1e12
-
-  // Define parameters for IterativeSolver
-  case object LearningRate extends Parameter[Double] {
-    val defaultValue = Some(0.1)
-  }
-
-  case object Iterations extends Parameter[Int] {
-    val defaultValue = Some(10)
-  }
-
-  case object ConvergenceThreshold extends Parameter[Double] {
-    val defaultValue = None
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala
deleted file mode 100644
index 554e155..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink
-
-import org.apache.flink.api.common.functions.{RichFilterFunction, 
RichMapFunction}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.java.operators.DataSink
-import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment}
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.ml.common.LabeledVector
-
-import scala.reflect.ClassTag
-
-package object ml {
-
-  /** Pimp my [[ExecutionEnvironment]] to directly support `readLibSVM`
-    *
-    * @param executionEnvironment
-    */
-  implicit class RichExecutionEnvironment(executionEnvironment: 
ExecutionEnvironment) {
-    def readLibSVM(path: String): DataSet[LabeledVector] = {
-      MLUtils.readLibSVM(executionEnvironment, path)
-    }
-  }
-
-  /** Pimp my [[DataSet]] to directly support `writeAsLibSVM`
-    *
-    * @param dataSet
-    */
-  implicit class RichLabeledDataSet(dataSet: DataSet[LabeledVector]) {
-    def writeAsLibSVM(path: String): DataSink[String] = {
-      MLUtils.writeLibSVM(path, dataSet)
-    }
-  }
-
-  implicit class RichDataSet[T](dataSet: DataSet[T]) {
-    def mapWithBcVariable[B, O: TypeInformation: ClassTag](
-        broadcastVariable: DataSet[B])(
-        fun: (T, B) => O)
-      : DataSet[O] = {
-      dataSet.map(new BroadcastSingleElementMapper[T, B, 
O](dataSet.clean(fun)))
-        .withBroadcastSet(broadcastVariable, "broadcastVariable")
-    }
-
-    def filterWithBcVariable[B, O](broadcastVariable: DataSet[B])(fun: (T, B) 
=> Boolean)
-      : DataSet[T] = {
-      dataSet.filter(new BroadcastSingleElementFilter[T, 
B](dataSet.clean(fun)))
-        .withBroadcastSet(broadcastVariable, "broadcastVariable")
-    }
-
-    def mapWithBcVariableIteration[B, O: TypeInformation: ClassTag](
-        broadcastVariable: DataSet[B])(fun: (T, B, Int) => O)
-      : DataSet[O] = {
-      dataSet.map(new BroadcastSingleElementMapperWithIteration[T, B, 
O](dataSet.clean(fun)))
-        .withBroadcastSet(broadcastVariable, "broadcastVariable")
-    }
-  }
-
-  private class BroadcastSingleElementMapper[T, B, O](
-      fun: (T, B) => O)
-    extends RichMapFunction[T, O] {
-    var broadcastVariable: B = _
-
-    @throws(classOf[Exception])
-    override def open(configuration: Configuration): Unit = {
-      broadcastVariable = 
getRuntimeContext.getBroadcastVariable[B]("broadcastVariable").get(0)
-    }
-
-    override def map(value: T): O = {
-      fun(value, broadcastVariable)
-    }
-  }
-
-  private class BroadcastSingleElementMapperWithIteration[T, B, O](
-      fun: (T, B, Int) => O)
-    extends RichMapFunction[T, O] {
-    var broadcastVariable: B = _
-
-    @throws(classOf[Exception])
-    override def open(configuration: Configuration): Unit = {
-      broadcastVariable = 
getRuntimeContext.getBroadcastVariable[B]("broadcastVariable").get(0)
-    }
-
-    override def map(value: T): O = {
-      fun(value, broadcastVariable, 
getIterationRuntimeContext.getSuperstepNumber)
-    }
-  }
-
-  private class BroadcastSingleElementFilter[T, B](
-      fun: (T, B) => Boolean)
-    extends RichFilterFunction[T] {
-    var broadcastVariable: B = _
-
-    @throws(classOf[Exception])
-    override def open(configuration: Configuration): Unit = {
-      broadcastVariable = 
getRuntimeContext.getBroadcastVariable[B]("broadcastVariable").get(0)
-    }
-
-    override def filter(value: T): Boolean = {
-      fun(value, broadcastVariable)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala
deleted file mode 100644
index bf5a8b2..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.pipeline
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.ParameterMap
-
-/** [[Predictor]] which represents a pipeline of possibly multiple 
[[Transformer]] and a trailing
-  * [[Predictor]].
-  *
-  * The [[ChainedPredictor]] can be used as a regular [[Predictor]]. Upon 
calling the fit method,
-  * the input data is piped through all preceding [[Transformer]] in the 
pipeline and the resulting
-  * data is given to the trailing [[Predictor]]. The same holds true for the 
predict operation.
-  *
-  * The pipeline mechanism has been inspired by scikit-learn
-  *
-  * @param transformer Preceding [[Transformer]] of the pipeline
-  * @param predictor Trailing [[Predictor]] of the pipeline
-  * @tparam T Type of the preceding [[Transformer]]
-  * @tparam P Type of the trailing [[Predictor]]
-  */
-case class ChainedPredictor[T <: Transformer[T], P <: 
Predictor[P]](transformer: T, predictor: P)
-  extends Predictor[ChainedPredictor[T, P]]{}
-
-object ChainedPredictor{
-
-  /** [[PredictDataSetOperation]] for the [[ChainedPredictor]].
-    *
-    * The [[PredictDataSetOperation]] requires the 
[[TransformDataSetOperation]] of the preceding
-    * [[Transformer]] and the [[PredictDataSetOperation]] of the trailing 
[[Predictor]]. Upon
-    * calling predict, the testing data is first transformed by the preceding 
[[Transformer]] and
-    * the result is then used to calculate the prediction via the trailing 
[[Predictor]].
-    *
-    * @param transformOperation [[TransformDataSetOperation]] for the 
preceding [[Transformer]]
-    * @param predictOperation [[PredictDataSetOperation]] for the trailing 
[[Predictor]]
-    * @tparam T Type of the preceding [[Transformer]]
-    * @tparam P Type of the trailing [[Predictor]]
-    * @tparam Testing Type of the testing data
-    * @tparam Intermediate Type of the intermediate data produced by the 
preceding [[Transformer]]
-    * @tparam Prediction Type of the predicted data generated by the trailing 
[[Predictor]]
-    * @return
-    */
-  implicit def chainedPredictOperation[
-      T <: Transformer[T],
-      P <: Predictor[P],
-      Testing,
-      Intermediate,
-      Prediction](
-      implicit transformOperation: TransformDataSetOperation[T, Testing, 
Intermediate],
-      predictOperation: PredictDataSetOperation[P, Intermediate, Prediction])
-    : PredictDataSetOperation[ChainedPredictor[T, P], Testing, Prediction] = {
-
-    new PredictDataSetOperation[ChainedPredictor[T, P], Testing, Prediction] {
-      override def predictDataSet(
-          instance: ChainedPredictor[T, P],
-          predictParameters: ParameterMap,
-          input: DataSet[Testing])
-        : DataSet[Prediction] = {
-
-        val testing = instance.transformer.transform(input, predictParameters)
-        instance.predictor.predict(testing, predictParameters)
-      }
-    }
-  }
-
-  /** [[FitOperation]] for the [[ChainedPredictor]].
-    *
-    * The [[FitOperation]] requires the [[FitOperation]] and the 
[[TransformDataSetOperation]] of
-    * the preceding [[Transformer]] as well as the [[FitOperation]] of the 
trailing [[Predictor]].
-    * Upon calling fit, the preceding [[Transformer]] is first fitted to the 
training data.
-    * The training data is then transformed by the fitted [[Transformer]]. The 
transformed data
-    * is then used to fit the [[Predictor]].
-    *
-    * @param fitOperation [[FitOperation]] of the preceding [[Transformer]]
-    * @param transformOperation [[TransformDataSetOperation]] of the preceding 
[[Transformer]]
-    * @param predictorFitOperation [[PredictDataSetOperation]] of the trailing 
[[Predictor]]
-    * @tparam L Type of the preceding [[Transformer]]
-    * @tparam R Type of the trailing [[Predictor]]
-    * @tparam I Type of the training data
-    * @tparam T Type of the intermediate data
-    * @return
-    */
-  implicit def chainedFitOperation[L <: Transformer[L], R <: Predictor[R], I, 
T](implicit
-    fitOperation: FitOperation[L, I],
-    transformOperation: TransformDataSetOperation[L, I, T],
-    predictorFitOperation: FitOperation[R, T]): 
FitOperation[ChainedPredictor[L, R], I] = {
-    new FitOperation[ChainedPredictor[L, R], I] {
-      override def fit(
-          instance: ChainedPredictor[L, R],
-          fitParameters: ParameterMap,
-          input: DataSet[I])
-        : Unit = {
-        instance.transformer.fit(input, fitParameters)
-        val intermediateResult = instance.transformer.transform(input, 
fitParameters)
-        instance.predictor.fit(intermediateResult, fitParameters)
-      }
-    }
-  }
-
-  implicit def chainedEvaluationOperation[
-      T <: Transformer[T],
-      P <: Predictor[P],
-      Testing,
-      Intermediate,
-      PredictionValue](
-      implicit transformOperation: TransformDataSetOperation[T, Testing, 
Intermediate],
-      evaluateOperation: EvaluateDataSetOperation[P, Intermediate, 
PredictionValue],
-      testingTypeInformation: TypeInformation[Testing],
-      predictionValueTypeInformation: TypeInformation[PredictionValue])
-    : EvaluateDataSetOperation[ChainedPredictor[T, P], Testing, 
PredictionValue] = {
-    new EvaluateDataSetOperation[ChainedPredictor[T, P], Testing, 
PredictionValue] {
-      override def evaluateDataSet(
-          instance: ChainedPredictor[T, P],
-          evaluateParameters: ParameterMap,
-          testing: DataSet[Testing])
-        : DataSet[(PredictionValue, PredictionValue)] = {
-        val intermediate = instance.transformer.transform(testing, 
evaluateParameters)
-        instance.predictor.evaluate(intermediate, evaluateParameters)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala
deleted file mode 100644
index bdf917d..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.pipeline
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.ParameterMap
-
-/** [[Transformer]] which represents the chaining of two [[Transformer]].
-  *
-  * A [[ChainedTransformer]] can be treated as regular [[Transformer]]. Upon 
calling the fit or
-  * transform operation, the data is piped through all [[Transformer]] of the 
pipeline.
-  *
-  * The pipeline mechanism has been inspired by scikit-learn
-  *
-  * @param left Left [[Transformer]] of the pipeline
-  * @param right Right [[Transformer]] of the pipeline
-  * @tparam L Type of the left [[Transformer]]
-  * @tparam R Type of the right [[Transformer]]
-  */
-case class ChainedTransformer[L <: Transformer[L], R <: Transformer[R]](left: 
L, right: R)
-  extends Transformer[ChainedTransformer[L, R]] {
-}
-
-object ChainedTransformer{
-
-  /** [[TransformDataSetOperation]] implementation for [[ChainedTransformer]].
-    *
-    * First the transform operation of the left [[Transformer]] is called with 
the input data. This
-    * generates intermediate data which is fed to the right [[Transformer]]'s 
transform operation.
-    *
-    * @param transformOpLeft [[TransformDataSetOperation]] for the left 
[[Transformer]]
-    * @param transformOpRight [[TransformDataSetOperation]] for the right 
[[Transformer]]
-    * @tparam L Type of the left [[Transformer]]
-    * @tparam R Type of the right [[Transformer]]
-    * @tparam I Type of the input data
-    * @tparam T Type of the intermediate output data
-    * @tparam O Type of the output data
-    * @return
-    */
-  implicit def chainedTransformOperation[
-      L <: Transformer[L],
-      R <: Transformer[R],
-      I,
-      T,
-      O](implicit
-      transformOpLeft: TransformDataSetOperation[L, I, T],
-      transformOpRight: TransformDataSetOperation[R, T, O])
-    : TransformDataSetOperation[ChainedTransformer[L,R], I, O] = {
-
-    new TransformDataSetOperation[ChainedTransformer[L, R], I, O] {
-      override def transformDataSet(
-          chain: ChainedTransformer[L, R],
-          transformParameters: ParameterMap,
-          input: DataSet[I]): DataSet[O] = {
-        val intermediateResult = transformOpLeft.transformDataSet(
-          chain.left,
-          transformParameters,
-          input)
-        transformOpRight.transformDataSet(chain.right, transformParameters, 
intermediateResult)
-      }
-    }
-  }
-
-  /** [[FitOperation]] implementation for [[ChainedTransformer]].
-    *
-    * First the fit operation of the left [[Transformer]] is called with the 
input data. Then
-    * the data is transformed by this [[Transformer]] and the given to the fit 
operation of the
-    * right [[Transformer]].
-    *
-    * @param leftFitOperation [[FitOperation]] for the left [[Transformer]]
-    * @param leftTransformOperation [[TransformDataSetOperation]] for the left 
[[Transformer]]
-    * @param rightFitOperation [[FitOperation]] for the right [[Transformer]]
-    * @tparam L Type of the left [[Transformer]]
-    * @tparam R Type of the right [[Transformer]]
-    * @tparam I Type of the input data
-    * @tparam T Type of the intermediate output data
-    * @return
-    */
-  implicit def chainedFitOperation[L <: Transformer[L], R <: Transformer[R], 
I, T](implicit
-      leftFitOperation: FitOperation[L, I],
-      leftTransformOperation: TransformDataSetOperation[L, I, T],
-      rightFitOperation: FitOperation[R, T]): 
FitOperation[ChainedTransformer[L, R], I] = {
-    new FitOperation[ChainedTransformer[L, R], I] {
-      override def fit(
-          instance: ChainedTransformer[L, R],
-          fitParameters: ParameterMap,
-          input: DataSet[I]): Unit = {
-        instance.left.fit(input, fitParameters)
-        val intermediateResult = instance.left.transform(input, fitParameters)
-        instance.right.fit(intermediateResult, fitParameters)
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala
deleted file mode 100644
index dbe0782..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala
+++ /dev/null
@@ -1,181 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.pipeline
-
-import scala.reflect.ClassTag
-import scala.reflect.runtime.universe._
-
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
-
-/** Base trait for Flink's pipeline operators.
-  *
-  * An estimator can be fitted to input data. In order to do that the 
implementing class has
-  * to provide an implementation of a [[FitOperation]] with the correct input 
type. In order to make
-  * the [[FitOperation]] retrievable by the Scala compiler, the implementation 
should be placed
-  * in the companion object of the implementing class.
-  *
-  * The pipeline mechanism has been inspired by scikit-learn
-  *
-  * @tparam Self
-  */
-trait Estimator[Self] extends WithParameters {
-  that: Self =>
-
-  /** Fits the estimator to the given input data. The fitting logic is 
contained in the
-    * [[FitOperation]]. The computed state will be stored in the implementing 
class.
-    *
-    * @param training Training data
-    * @param fitParameters Additional parameters for the [[FitOperation]]
-    * @param fitOperation [[FitOperation]] which encapsulates the algorithm 
logic
-    * @tparam Training Type of the training data
-    * @return
-    */
-  def fit[Training](
-      training: DataSet[Training],
-      fitParameters: ParameterMap = ParameterMap.Empty)(implicit
-      fitOperation: FitOperation[Self, Training]): Unit = {
-    FlinkMLTools.registerFlinkMLTypes(training.getExecutionEnvironment)
-    fitOperation.fit(this, fitParameters, training)
-  }
-}
-
-object Estimator{
-
-  /** Fallback [[FitOperation]] type class implementation which is used if no 
other
-    * [[FitOperation]] with the right input types could be found in the scope 
of the implementing
-    * class. The fallback [[FitOperation]] makes the system fail in the 
pre-flight phase by
-    * throwing a [[RuntimeException]] which states the reason for the failure. 
Usually the error
-    * is a missing [[FitOperation]] implementation for the input types or the 
wrong chaining
-    * of pipeline operators which have incompatible input/output types.
-    *
-     * @tparam Self Type of the pipeline operator
-    * @tparam Training Type of training data
-    * @return
-    */
-  implicit def fallbackFitOperation[
-      Self: TypeTag,
-      Training: TypeTag]
-    : FitOperation[Self, Training] = {
-    new FitOperation[Self, Training]{
-      override def fit(
-          instance: Self,
-          fitParameters: ParameterMap,
-          input: DataSet[Training])
-        : Unit = {
-        val self = typeOf[Self]
-        val training = typeOf[Training]
-
-        throw new RuntimeException("There is no FitOperation defined for " + 
self +
-          " which trains on a DataSet[" + training + "]")
-      }
-    }
-  }
-
-  /** Fallback [[PredictDataSetOperation]] if a [[Predictor]] is called with a 
not supported input
-    * data type. The fallback [[PredictDataSetOperation]] lets the system fail 
with a
-    * [[RuntimeException]] stating which input and output data types were 
inferred but for which no
-    * [[PredictDataSetOperation]] could be found.
-    *
-    * @tparam Self Type of the [[Predictor]]
-    * @tparam Testing Type of the testing data
-    * @return
-    */
-  implicit def fallbackPredictOperation[
-      Self: TypeTag,
-      Testing: TypeTag]
-    : PredictDataSetOperation[Self, Testing, Any] = {
-    new PredictDataSetOperation[Self, Testing, Any] {
-      override def predictDataSet(
-          instance: Self,
-          predictParameters: ParameterMap,
-          input: DataSet[Testing])
-        : DataSet[Any] = {
-        val self = typeOf[Self]
-        val testing = typeOf[Testing]
-
-        throw new RuntimeException("There is no PredictOperation defined for " 
+ self +
-          " which takes a DataSet[" + testing + "] as input.")
-      }
-    }
-  }
-
-  /** Fallback [[TransformDataSetOperation]] for [[Transformer]] which do not 
support the input or
-    * output type with which they are called. This is usualy the case if 
pipeline operators are
-    * chained which have incompatible input/output types. In order to detect 
these failures, the
-    * fallback [[TransformDataSetOperation]] throws a [[RuntimeException]] 
with the corresponding
-    * input/output types. Consequently, a wrong pipeline will be detected at 
pre-flight phase of
-    * Flink and thus prior to execution time.
-    *
-    * @tparam Self Type of the [[Transformer]] for which the 
[[TransformDataSetOperation]] is
-    *              defined
-    * @tparam IN Input data type of the [[TransformDataSetOperation]]
-    * @return
-    */
-  implicit def fallbackTransformOperation[
-  Self: TypeTag,
-  IN: TypeTag]
-  : TransformDataSetOperation[Self, IN, Any] = {
-    new TransformDataSetOperation[Self, IN, Any] {
-      override def transformDataSet(
-        instance: Self,
-        transformParameters: ParameterMap,
-        input: DataSet[IN])
-      : DataSet[Any] = {
-        val self = typeOf[Self]
-        val in = typeOf[IN]
-
-        throw new RuntimeException("There is no TransformOperation defined for 
" +
-          self +  " which takes a DataSet[" + in +
-          "] as input.")
-      }
-    }
-  }
-
-  implicit def fallbackEvaluateOperation[
-      Self: TypeTag,
-      Testing: TypeTag]
-    : EvaluateDataSetOperation[Self, Testing, Any] = {
-    new EvaluateDataSetOperation[Self, Testing, Any] {
-      override def evaluateDataSet(
-        instance: Self,
-        predictParameters: ParameterMap,
-        input: DataSet[Testing])
-      : DataSet[(Any, Any)] = {
-        val self = typeOf[Self]
-        val testing = typeOf[Testing]
-
-        throw new RuntimeException("There is no PredictOperation defined for " 
+ self +
-          " which takes a DataSet[" + testing + "] as input.")
-      }
-    }
-  }
-}
-
-/** Type class for the fit operation of an [[Estimator]].
-  *
-  * The [[FitOperation]] contains a self type parameter so that the Scala 
compiler looks into
-  * the companion object of this class to find implicit values.
-  *
-  * @tparam Self Type of the [[Estimator]] subclass for which the 
[[FitOperation]] is defined
-  * @tparam Training Type of the training data
-  */
-trait FitOperation[Self, Training]{
-  def fit(instance: Self, fitParameters: ParameterMap,  input: 
DataSet[Training]): Unit
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
deleted file mode 100644
index 9d11cff..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.pipeline
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-import org.apache.flink.api.scala._
-import org.apache.flink.ml._
-import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
-
-/** Predictor trait for Flink's pipeline operators.
-  *
-  * A [[Predictor]] calculates predictions for testing data based on the model 
it learned during
-  * the fit operation (training phase). In order to do that, the implementing 
class has to provide
-  * a [[FitOperation]] and a [[PredictDataSetOperation]] implementation for 
the correct types. The
-  * implicit values should be put into the scope of the companion object of 
the implementing class
-  * to make them retrievable for the Scala compiler.
-  *
-  * The pipeline mechanism has been inspired by scikit-learn
-  *
-  * @tparam Self Type of the implementing class
-  */
-trait Predictor[Self] extends Estimator[Self] with WithParameters {
-  that: Self =>
-
-  /** Predict testing data according the learned model. The implementing class 
has to provide
-    * a corresponding implementation of [[PredictDataSetOperation]] which 
contains the prediction
-    * logic.
-    *
-    * @param testing Testing data which shall be predicted
-    * @param predictParameters Additional parameters for the prediction
-    * @param predictor [[PredictDataSetOperation]] which encapsulates the 
prediction logic
-    * @tparam Testing Type of the testing data
-    * @tparam Prediction Type of the prediction data
-    * @return
-    */
-  def predict[Testing, Prediction](
-      testing: DataSet[Testing],
-      predictParameters: ParameterMap = ParameterMap.Empty)(implicit
-      predictor: PredictDataSetOperation[Self, Testing, Prediction])
-    : DataSet[Prediction] = {
-    FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment)
-    predictor.predictDataSet(this, predictParameters, testing)
-  }
-
-  /** Evaluates the testing data by computing the prediction value and 
returning a pair of true
-    * label value and prediction value. It is important that the 
implementation chooses a Testing
-    * type from which it can extract the true label value.
-    *
-    * @param testing
-    * @param evaluateParameters
-    * @param evaluator
-    * @tparam Testing
-    * @tparam PredictionValue
-    * @return
-    */
-  def evaluate[Testing, PredictionValue](
-      testing: DataSet[Testing],
-      evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit
-      evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue])
-    : DataSet[(PredictionValue, PredictionValue)] = {
-    FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment)
-    evaluator.evaluateDataSet(this, evaluateParameters, testing)
-  }
-}
-
-object Predictor {
-
-  /** Default [[PredictDataSetOperation]] which takes a [[PredictOperation]] 
to calculate a tuple
-    * of testing element and its prediction value.
-    *
-    * Note: We have to put the TypeInformation implicit values for Testing and 
PredictionValue after
-    * the PredictOperation implicit parameter. Otherwise, if it's defined as a 
context bound, then
-    * the Scala compiler does not find the implicit [[PredictOperation]] value.
-    *
-    * @param predictOperation
-    * @param testingTypeInformation
-    * @param predictionValueTypeInformation
-    * @tparam Instance
-    * @tparam Model
-    * @tparam Testing
-    * @tparam PredictionValue
-    * @return
-    */
-  implicit def defaultPredictDataSetOperation[
-      Instance <: Estimator[Instance],
-      Model,
-      Testing,
-      PredictionValue](
-      implicit predictOperation: PredictOperation[Instance, Model, Testing, 
PredictionValue],
-      testingTypeInformation: TypeInformation[Testing],
-      predictionValueTypeInformation: TypeInformation[PredictionValue])
-    : PredictDataSetOperation[Instance, Testing, (Testing, PredictionValue)] = 
{
-    new PredictDataSetOperation[Instance, Testing, (Testing, PredictionValue)] 
{
-      override def predictDataSet(
-          instance: Instance,
-          predictParameters: ParameterMap,
-          input: DataSet[Testing])
-        : DataSet[(Testing, PredictionValue)] = {
-        val resultingParameters = instance.parameters ++ predictParameters
-
-        val model = predictOperation.getModel(instance, resultingParameters)
-
-        implicit val resultTypeInformation = createTypeInformation[(Testing, 
PredictionValue)]
-
-        input.mapWithBcVariable(model){
-          (element, model) => {
-            (element, predictOperation.predict(element, model))
-          }
-        }
-      }
-    }
-  }
-
-  /** Default [[EvaluateDataSetOperation]] which takes a [[PredictOperation]] 
to calculate a tuple
-    * of true label value and predicted label value.
-    *
-    * Note: We have to put the TypeInformation implicit values for Testing and 
PredictionValue after
-    * the PredictOperation implicit parameter. Otherwise, if it's defined as a 
context bound, then
-    * the Scala compiler does not find the implicit [[PredictOperation]] value.
-    *
-    * @param predictOperation
-    * @param testingTypeInformation
-    * @param predictionValueTypeInformation
-    * @tparam Instance
-    * @tparam Model
-    * @tparam Testing
-    * @tparam PredictionValue
-    * @return
-    */
-  implicit def defaultEvaluateDataSetOperation[
-      Instance <: Estimator[Instance],
-      Model,
-      Testing,
-      PredictionValue](
-      implicit predictOperation: PredictOperation[Instance, Model, Testing, 
PredictionValue],
-      testingTypeInformation: TypeInformation[Testing],
-      predictionValueTypeInformation: TypeInformation[PredictionValue])
-    : EvaluateDataSetOperation[Instance, (Testing, PredictionValue), 
PredictionValue] = {
-    new EvaluateDataSetOperation[Instance, (Testing, PredictionValue), 
PredictionValue] {
-      override def evaluateDataSet(
-          instance: Instance,
-          evaluateParameters: ParameterMap,
-          testing: DataSet[(Testing, PredictionValue)])
-        : DataSet[(PredictionValue,  PredictionValue)] = {
-        val resultingParameters = instance.parameters ++ evaluateParameters
-        val model = predictOperation.getModel(instance, resultingParameters)
-
-        implicit val resultTypeInformation = createTypeInformation[(Testing, 
PredictionValue)]
-
-        testing.mapWithBcVariable(model){
-          (element, model) => {
-            (element._2, predictOperation.predict(element._1, model))
-          }
-        }
-      }
-    }
-  }
-}
-
-/** Type class for the predict operation of [[Predictor]]. This predict 
operation works on DataSets.
-  *
-  * [[Predictor]]s either have to implement this trait or the 
[[PredictOperation]] trait. The
-  * implementation has to be made available as an implicit value or function 
in the scope of
-  * their companion objects.
-  *
-  * The first type parameter is the type of the implementing [[Predictor]] 
class so that the Scala
-  * compiler includes the companion object of this class in the search scope 
for the implicit
-  * values.
-  *
-  * @tparam Self Type of [[Predictor]] implementing class
-  * @tparam Testing Type of testing data
-  * @tparam Prediction Type of predicted data
-  */
-trait PredictDataSetOperation[Self, Testing, Prediction] extends Serializable{
-
-  /** Calculates the predictions for all elements in the [[DataSet]] input
-    *
-    * @param instance The Predictor instance that we will use to make the 
predictions
-    * @param predictParameters The parameters for the prediction
-    * @param input The DataSet containing the unlabeled examples
-    * @return
-    */
-  def predictDataSet(
-      instance: Self,
-      predictParameters: ParameterMap,
-      input: DataSet[Testing])
-    : DataSet[Prediction]
-}
-
-/** Type class for predict operation. It takes an element and the model and 
then computes the
-  * prediction value for this element.
-  *
-  * It is sufficient for a [[Predictor]] to only implement this trait to 
support the evaluate and
-  * predict method.
-  *
-  * @tparam Instance The concrete type of the [[Predictor]] that we will use 
for predictions
-  * @tparam Model The representation of the predictive model for the 
algorithm, for example a
-  *               Vector of weights
-  * @tparam Testing The type of the example that we will use to make the 
predictions (input)
-  * @tparam Prediction The type of the label that the prediction operation 
will produce (output)
-  *
-  */
-trait PredictOperation[Instance, Model, Testing, Prediction] extends 
Serializable{
-
-  /** Defines how to retrieve the model of the type for which this operation 
was defined
-    *
-    * @param instance The Predictor instance that we will use to make the 
predictions
-    * @param predictParameters The parameters for the prediction
-    * @return A DataSet with the model representation as its only element
-    */
-  def getModel(instance: Instance, predictParameters: ParameterMap): 
DataSet[Model]
-
-  /** Calculates the prediction for a single element given the model of the 
[[Predictor]].
-    *
-    * @param value The unlabeled example on which we make the prediction
-    * @param model The model representation of the prediciton algorithm
-    * @return A label for the provided example of type [[Prediction]]
-    */
-  def predict(value: Testing, model: Model):
-    Prediction
-}
-
-/** Type class for the evaluate operation of [[Predictor]]. This evaluate 
operation works on
-  * DataSets.
-  *
-  * It takes a [[DataSet]] of some type. For each element of this [[DataSet]] 
the evaluate method
-  * computes the prediction value and returns a tuple of true label value and 
prediction value.
-  *
-  * @tparam Instance The concrete type of the Predictor instance that we will 
use to make the
-  *                  predictions
-  * @tparam Testing The type of the example that we will use to make the 
predictions (input)
-  * @tparam Prediction The type of the label that the prediction operation 
will produce (output)
-  *
-  */
-trait EvaluateDataSetOperation[Instance, Testing, Prediction] extends 
Serializable{
-  def evaluateDataSet(
-      instance: Instance,
-      evaluateParameters: ParameterMap,
-      testing: DataSet[Testing])
-    : DataSet[(Prediction, Prediction)]
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala
deleted file mode 100644
index 014ad2b..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.pipeline
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.DataSet
-import org.apache.flink.ml._
-import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters}
-
-import scala.reflect.ClassTag
-
-/** Transformer trait for Flink's pipeline operators.
-  *
-  * A Transformer transforms a [[DataSet]] of an input type into a [[DataSet]] 
of an output type.
-  * Furthermore, a [[Transformer]] is also an [[Estimator]], because some 
transformations depend
-  * on the training data. In order to do that the implementing class has to 
provide a
-  * [[TransformDataSetOperation]] and [[FitOperation]] implementation. The 
Scala compiler finds
-  * these implicit values if it is put in the scope of the companion object of 
the implementing
-  * class.
-  *
-  * [[Transformer]] can be chained with other [[Transformer]] and 
[[Predictor]] to create
-  * pipelines. These pipelines can consist of an arbitrary number of 
[[Transformer]] and at most
-  * one trailing [[Predictor]].
-  *
-  * The pipeline mechanism has been inspired by scikit-learn
-  *
-  * @tparam Self
-  */
-trait Transformer[Self <: Transformer[Self]]
-  extends Estimator[Self]
-  with WithParameters
-  with Serializable {
-  that: Self =>
-
-  /** Transform operation which transforms an input [[DataSet]] of type I into 
an ouptut [[DataSet]]
-    * of type O. The actual transform operation is implemented within the
-    * [[TransformDataSetOperation]].
-    *
-    * @param input Input [[DataSet]] of type I
-    * @param transformParameters Additional parameters for the 
[[TransformDataSetOperation]]
-    * @param transformOperation [[TransformDataSetOperation]] which 
encapsulates the algorithm's
-    *                          logic
-    * @tparam Input Input data type
-    * @tparam Output Ouptut data type
-    * @return
-    */
-  def transform[Input, Output](
-      input: DataSet[Input],
-      transformParameters: ParameterMap = ParameterMap.Empty)
-      (implicit transformOperation: TransformDataSetOperation[Self, Input, 
Output])
-    : DataSet[Output] = {
-    FlinkMLTools.registerFlinkMLTypes(input.getExecutionEnvironment)
-    transformOperation.transformDataSet(that, transformParameters, input)
-  }
-
-  /** Chains two [[Transformer]] to form a [[ChainedTransformer]].
-    *
-    * @param transformer Right side transformer of the resulting pipeline
-    * @tparam T Type of the [[Transformer]]
-    * @return
-    */
-  def chainTransformer[T <: Transformer[T]](transformer: T): 
ChainedTransformer[Self, T] = {
-    ChainedTransformer(this, transformer)
-  }
-
-  /** Chains a [[Transformer]] with a [[Predictor]] to form a 
[[ChainedPredictor]].
-    *
-    * @param predictor Trailing [[Predictor]] of the resulting pipeline
-    * @tparam P Type of the [[Predictor]]
-    * @return
-    */
-  def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, 
P] = {
-    ChainedPredictor(this, predictor)
-  }
-}
-
-object Transformer{
-  implicit def defaultTransformDataSetOperation[
-      Instance <: Estimator[Instance],
-      Model,
-      Input,
-      Output](
-      implicit transformOperation: TransformOperation[Instance, Model, Input, 
Output],
-      outputTypeInformation: TypeInformation[Output],
-      outputClassTag: ClassTag[Output])
-    : TransformDataSetOperation[Instance, Input, Output] = {
-    new TransformDataSetOperation[Instance, Input, Output] {
-      override def transformDataSet(
-          instance: Instance,
-          transformParameters: ParameterMap,
-          input: DataSet[Input])
-        : DataSet[Output] = {
-        val resultingParameters = instance.parameters ++ transformParameters
-        val model = transformOperation.getModel(instance, resultingParameters)
-
-        input.mapWithBcVariable(model){
-          (element, model) => transformOperation.transform(element, model)
-        }
-      }
-    }
-  }
-}
-
-/** Type class for a transform operation of [[Transformer]]. This works on 
[[DataSet]] of elements.
-  *
-  * The [[TransformDataSetOperation]] contains a self type parameter so that 
the Scala compiler
-  * looks into the companion object of this class to find implicit values.
-  *
-  * @tparam Instance Type of the [[Transformer]] for which the 
[[TransformDataSetOperation]] is
-  *                  defined
-  * @tparam Input Input data type
-  * @tparam Output Ouptut data type
-  */
-trait TransformDataSetOperation[Instance, Input, Output] extends Serializable{
-  def transformDataSet(
-      instance: Instance,
-      transformParameters: ParameterMap,
-      input: DataSet[Input])
-    : DataSet[Output]
-}
-
-/** Type class for a transform operation which works on a single element and 
the corresponding model
-  * of the [[Transformer]].
-  *
-  * @tparam Instance
-  * @tparam Model
-  * @tparam Input
-  * @tparam Output
-  */
-trait TransformOperation[Instance, Model, Input, Output] extends Serializable{
-
-  /** Retrieves the model of the [[Transformer]] for which this operation has 
been defined.
-    *
-    * @param instance
-    * @param transformParemters
-    * @return
-    */
-  def getModel(instance: Instance, transformParemters: ParameterMap): 
DataSet[Model]
-
-  /** Transforms a single element with respect to the model associated with 
the respective
-    * [[Transformer]]
-    *
-    * @param element
-    * @param model
-    * @return
-    */
-  def transform(element: Input, model: Model): Output
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
deleted file mode 100644
index 217e2c2..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.ml.preprocessing
-
-import breeze.linalg
-import breeze.linalg.{max, min}
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
-import org.apache.flink.ml._
-import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
-import org.apache.flink.ml.math.Breeze._
-import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
-import org.apache.flink.ml.pipeline.{TransformDataSetOperation, FitOperation,
-Transformer}
-import org.apache.flink.ml.preprocessing.MinMaxScaler.{Max, Min}
-
-import scala.reflect.ClassTag
-
-/** Scales observations, so that all features are in a user-specified range.
-  * By default for [[MinMaxScaler]] transformer range = [0,1].
-  *
-  * This transformer takes a subtype of  [[Vector]] of values and maps it to a
-  * scaled subtype of [[Vector]] such that each feature lies between a 
user-specified range.
-  *
-  * This transformer can be prepended to all [[Transformer]] and
-  * [[org.apache.flink.ml.pipeline.Predictor]] implementations which expect as 
input a subtype
-  * of [[Vector]] or a [[LabeledVector]].
-  *
-  * @example
-  * {{{
-  *               val trainingDS: DataSet[Vector] = env.fromCollection(data)
-  *               val transformer = MinMaxScaler().setMin(-1.0)
-  *
-  *               transformer.fit(trainingDS)
-  *               val transformedDS = transformer.transform(trainingDS)
-  * }}}
-  *
-  * =Parameters=
-  *
-  * - [[Min]]: The minimum value of the range of the transformed data set; by 
default equal to 0
-  * - [[Max]]: The maximum value of the range of the transformed data set; by 
default
-  * equal to 1
-  */
-class MinMaxScaler extends Transformer[MinMaxScaler] {
-
-  private [preprocessing] var metricsOption: Option[
-      DataSet[(linalg.Vector[Double], linalg.Vector[Double])]
-    ] = None
-
-  /** Sets the minimum for the range of the transformed data
-    *
-    * @param min the user-specified minimum value.
-    * @return the MinMaxScaler instance with its minimum value set to the 
user-specified value.
-    */
-  def setMin(min: Double): MinMaxScaler = {
-    parameters.add(Min, min)
-    this
-  }
-
-  /** Sets the maximum for the range of the transformed data
-    *
-    * @param max the user-specified maximum value.
-    * @return the MinMaxScaler instance with its minimum value set to the 
user-specified value.
-    */
-  def setMax(max: Double): MinMaxScaler = {
-    parameters.add(Max, max)
-    this
-  }
-}
-
-object MinMaxScaler {
-
-  // ====================================== Parameters 
=============================================
-
-  case object Min extends Parameter[Double] {
-    override val defaultValue: Option[Double] = Some(0.0)
-  }
-
-  case object Max extends Parameter[Double] {
-    override val defaultValue: Option[Double] = Some(1.0)
-  }
-
-  // ==================================== Factory methods 
==========================================
-
-  def apply(): MinMaxScaler = {
-    new MinMaxScaler()
-  }
-
-  // ====================================== Operations 
=============================================
-
-  /** Trains the [[MinMaxScaler]] by learning the minimum and maximum of each 
feature of the
-    * training data. These values are used in the transform step to transform 
the given input data.
-    *
-    * @tparam T Input data type which is a subtype of [[Vector]]
-    * @return [[FitOperation]] training the [[MinMaxScaler]] on subtypes of 
[[Vector]]
-    */
-  implicit def fitVectorMinMaxScaler[T <: Vector] = new 
FitOperation[MinMaxScaler, T] {
-    override def fit(instance: MinMaxScaler, fitParameters: ParameterMap, 
input: DataSet[T])
-    : Unit = {
-      val metrics = extractFeatureMinMaxVectors(input)
-
-      instance.metricsOption = Some(metrics)
-    }
-  }
-
-  /** Trains the [[MinMaxScaler]] by learning the minimum and maximum of the 
features of the
-    * training data which is of type [[LabeledVector]]. The minimum and 
maximum are used to
-    * transform the given input data.
-    *
-    */
-  implicit val fitLabeledVectorMinMaxScaler = {
-    new FitOperation[MinMaxScaler, LabeledVector] {
-      override def fit(
-        instance: MinMaxScaler,
-        fitParameters: ParameterMap,
-        input: DataSet[LabeledVector])
-      : Unit = {
-        val vectorDS = input.map(_.vector)
-        val metrics = extractFeatureMinMaxVectors(vectorDS)
-
-        instance.metricsOption = Some(metrics)
-      }
-    }
-  }
-
-  /** Calculates in one pass over the data the features' minimum and maximum 
values.
-    *
-    * @param dataSet The data set for which we want to calculate the minimum 
and maximum values.
-    * @return  DataSet containing a single tuple of two vectors (minVector, 
maxVector).
-    *          The first vector represents the minimum values vector and the 
second is the maximum
-    *          values vector.
-    */
-  private def extractFeatureMinMaxVectors[T <: Vector](dataSet: DataSet[T])
-  : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = {
-
-    val minMax = dataSet.map {
-      v => (v.asBreeze, v.asBreeze)
-    }.reduce {
-      (minMax1, minMax2) => {
-
-        val tempMinimum = min(minMax1._1, minMax2._1)
-        val tempMaximum = max(minMax1._2, minMax2._2)
-
-        (tempMinimum, tempMaximum)
-      }
-    }
-    minMax
-  }
-
-  /** [[TransformDataSetOperation]] which scales input data of subtype of 
[[Vector]] with respect to
-    * the calculated minimum and maximum of the training data. The minimum and 
maximum
-    * values of the resulting data is configurable.
-    *
-    * @tparam T Type of the input and output data which has to be a subtype of 
[[Vector]]
-    * @return [[TransformDataSetOperation]] scaling subtypes of [[Vector]] 
such that the feature
-    *        values are in the configured range
-    */
-  implicit def transformVectors[T <: Vector : BreezeVectorConverter : 
TypeInformation : ClassTag]
-  = {
-    new TransformDataSetOperation[MinMaxScaler, T, T] {
-      override def transformDataSet(
-        instance: MinMaxScaler,
-        transformParameters: ParameterMap,
-        input: DataSet[T])
-      : DataSet[T] = {
-
-        val resultingParameters = instance.parameters ++ transformParameters
-        val min = resultingParameters(Min)
-        val max = resultingParameters(Max)
-
-        instance.metricsOption match {
-          case Some(metrics) => {
-            input.mapWithBcVariable(metrics) {
-              (vector, metrics) => {
-                val (broadcastMin, broadcastMax) = metrics
-                scaleVector(vector, broadcastMin, broadcastMax, min, max)
-              }
-            }
-          }
-
-          case None =>
-            throw new RuntimeException("The MinMaxScaler has not been fitted 
to the data. " +
-              "This is necessary to estimate the minimum and maximum of the 
data.")
-        }
-      }
-    }
-  }
-
-  implicit val transformLabeledVectors = {
-    new TransformDataSetOperation[MinMaxScaler, LabeledVector, LabeledVector] {
-      override def transformDataSet(instance: MinMaxScaler,
-        transformParameters: ParameterMap,
-        input: DataSet[LabeledVector]): DataSet[LabeledVector] = {
-        val resultingParameters = instance.parameters ++ transformParameters
-        val min = resultingParameters(Min)
-        val max = resultingParameters(Max)
-
-        instance.metricsOption match {
-          case Some(metrics) => {
-            input.mapWithBcVariable(metrics) {
-              (labeledVector, metrics) => {
-                val (broadcastMin, broadcastMax) = metrics
-                val LabeledVector(label, vector) = labeledVector
-
-                LabeledVector(label, scaleVector(vector, broadcastMin, 
broadcastMax, min, max))
-              }
-            }
-          }
-
-          case None =>
-            throw new RuntimeException("The MinMaxScaler has not been fitted 
to the data. " +
-              "This is necessary to estimate the minimum and maximum of the 
data.")
-        }
-      }
-    }
-  }
-
-  /** Scales a vector such that it's features lie in the range [min, max]
-    *
-    * @param vector Vector to scale
-    * @param broadcastMin Vector containing for each feature the minimal value 
in the training set
-    * @param broadcastMax Vector containing for each feature the maximal value 
in the training set
-    * @param min Minimal value of range
-    * @param max Maximal value of range
-    * @tparam T Type of [[Vector]]
-    * @return Scaled feature vector
-    */
-  private def scaleVector[T <: Vector: BreezeVectorConverter](
-      vector: T,
-      broadcastMin: linalg.Vector[Double],
-      broadcastMax: linalg.Vector[Double],
-      min: Double,
-      max: Double)
-    : T = {
-    var myVector = vector.asBreeze
-
-    //handle the case where a feature takes only one value
-    val rangePerFeature = (broadcastMax - broadcastMin)
-    for (i <- 0 until rangePerFeature.size) {
-      if (rangePerFeature(i) == 0.0) {
-        rangePerFeature(i)= 1.0
-      }
-    }
-
-    myVector -= broadcastMin
-    myVector :/= rangePerFeature
-    myVector = (myVector :* (max - min)) + min
-    myVector.fromBreeze
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
deleted file mode 100644
index f1c788e..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.preprocessing
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala.{DataSet, _}
-import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
-import org.apache.flink.ml.math.{Vector, VectorBuilder}
-import org.apache.flink.ml.pipeline.{FitOperation, TransformDataSetOperation, 
Transformer}
-import org.apache.flink.ml.preprocessing.PolynomialFeatures.Degree
-
-import scala.reflect.ClassTag
-
-/** Maps a vector into the polynomial feature space.
-  *
-  * This transformer takes a a vector of values `(x, y, z, ...)` and maps it 
into the
-  * polynomial feature space of degree `d`. That is to say, it calculates the 
following
-  * representation:
-  *
-  * `(x, y, z, x^2, xy, y^2, yz, z^2, x^3, x^2y, x^2z, xyz, ...)^T`
-  *
-  * This transformer can be prepended to all 
[[org.apache.flink.ml.pipeline.Transformer]] and
-  * [[org.apache.flink.ml.pipeline.Predictor]] implementations which expect an 
input of
-  * [[LabeledVector]].
-  *
-  * @example
-  *          {{{
-  *             val trainingDS: DataSet[LabeledVector] = ...
-  *
-  *             val polyFeatures = PolynomialFeatures()
-  *               .setDegree(3)
-  *
-  *             val mlr = MultipleLinearRegression()
-  *
-  *             val pipeline = polyFeatures.chainPredictor(mlr)
-  *
-  *             pipeline.fit(trainingDS)
-  *          }}}
-  *
-  * =Parameters=
-  *
-  *  - [[org.apache.flink.ml.preprocessing.PolynomialFeatures.Degree]]: 
Maximum polynomial degree
-  */
-class PolynomialFeatures extends Transformer[PolynomialFeatures] {
-
-  def setDegree(degree: Int): PolynomialFeatures = {
-    parameters.add(Degree, degree)
-    this
-  }
-}
-
-object PolynomialFeatures{
-
-  // ====================================== Parameters 
=============================================
-
-  case object Degree extends Parameter[Int] {
-    override val defaultValue: Option[Int] = Some(1)
-  }
-
-  // =================================== Factory methods 
===========================================
-
-  def apply(): PolynomialFeatures = {
-    new PolynomialFeatures()
-  }
-
-  // ====================================== Operations 
=============================================
-
-  /** The [[PolynomialFeatures]] transformer does not need a fitting phase.
-    *
-    * @tparam T The fitting works with arbitrary input types
-    * @return
-    */
-  implicit def fitNoOp[T] = {
-    new FitOperation[PolynomialFeatures, T]{
-      override def fit(
-          instance: PolynomialFeatures,
-          fitParameters: ParameterMap,
-          input: DataSet[T])
-        : Unit = {}
-    }
-  }
-
-  /** [[org.apache.flink.ml.pipeline.TransformDataSetOperation]] to map a 
[[Vector]] into the
-    * polynomial feature space.
-    *
-    * @tparam T Subclass of [[Vector]]
-    * @return
-    */
-  implicit def transformVectorIntoPolynomialBase[
-      T <: Vector : VectorBuilder: TypeInformation: ClassTag
-    ] = {
-    new TransformDataSetOperation[PolynomialFeatures, T, T] {
-      override def transformDataSet(
-          instance: PolynomialFeatures,
-          transformParameters: ParameterMap,
-          input: DataSet[T])
-        : DataSet[T] = {
-        val resultingParameters = instance.parameters ++ transformParameters
-
-        val degree = resultingParameters(Degree)
-
-        input.map {
-          vector => {
-            calculatePolynomial(degree, vector)
-          }
-        }
-      }
-    }
-  }
-
-  /** [[org.apache.flink.ml.pipeline.TransformDataSetOperation]] to map a 
[[LabeledVector]] into the
-    * polynomial feature space
-    */
-  implicit val transformLabeledVectorIntoPolynomialBase =
-    new TransformDataSetOperation[PolynomialFeatures, LabeledVector, 
LabeledVector] {
-
-    override def transformDataSet(
-        instance: PolynomialFeatures,
-        transformParameters: ParameterMap,
-        input: DataSet[LabeledVector])
-      : DataSet[LabeledVector] = {
-      val resultingParameters = instance.parameters ++ transformParameters
-
-      val degree = resultingParameters(Degree)
-
-      input.map {
-        labeledVector => {
-          val vector = labeledVector.vector
-          val label = labeledVector.label
-
-          val transformedVector = calculatePolynomial(degree, vector)
-
-          LabeledVector(label, transformedVector)
-        }
-      }
-    }
-  }
-
-
-  private def calculatePolynomial[T <: Vector: VectorBuilder](degree: Int, 
vector: T): T = {
-    val builder = implicitly[VectorBuilder[T]]
-    builder.build(calculateCombinedCombinations(degree, vector))
-  }
-
-  /** Calculates for a given vector its representation in the polynomial 
feature space.
-    *
-    * @param degree Maximum degree of polynomial
-    * @param vector Values of the polynomial variables
-    * @return List of polynomial values
-    */
-  private def calculateCombinedCombinations(degree: Int, vector: Vector): 
List[Double] = {
-    if(degree == 0) {
-      List()
-    } else {
-      val partialResult = calculateCombinedCombinations(degree - 1, vector)
-
-      val combinations = calculateCombinations(vector.size, degree)
-
-      val result = combinations map {
-        combination =>
-          combination.zipWithIndex.map{
-            case (exp, idx) => math.pow(vector(idx), exp)
-          }.fold(1.0)(_ * _)
-      }
-
-      result ::: partialResult
-    }
-
-  }
-
-  /** Calculates all possible combinations of a polynom of degree `value`, 
whereas the polynom
-    * can consist of up to `length` factors. The return value is the list of 
the exponents of the
-    * individual factors
-    *
-    * @param length maximum number of factors
-    * @param value degree of polynomial
-    * @return List of lists which contain the exponents of the individual 
factors
-    */
-  private def calculateCombinations(length: Int, value: Int): List[List[Int]] 
= {
-    if(length == 0) {
-      List()
-    } else if (length == 1) {
-      List(List(value))
-    } else {
-      value to 0 by -1 flatMap {
-        v =>
-          calculateCombinations(length - 1, value - v) map {
-            v::_
-          }
-      } toList
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
deleted file mode 100644
index 82e8abf..0000000
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.preprocessing
-
-import breeze.linalg
-import breeze.numerics.sqrt
-import breeze.numerics.sqrt._
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.api.scala._
-import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
-import org.apache.flink.ml.math.Breeze._
-import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
-import org.apache.flink.ml.pipeline.{TransformOperation, FitOperation,
-Transformer}
-import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
-
-import scala.reflect.ClassTag
-
-/** Scales observations, so that all features have a user-specified mean and 
standard deviation.
-  * By default for [[StandardScaler]] transformer mean=0.0 and std=1.0.
-  *
-  * This transformer takes a subtype of  [[Vector]] of values and maps it to a
-  * scaled subtype of [[Vector]] such that each feature has a user-specified 
mean and standard
-  * deviation.
-  *
-  * This transformer can be prepended to all [[Transformer]] and
-  * [[org.apache.flink.ml.pipeline.Predictor]] implementations which expect as 
input a subtype
-  * of [[Vector]].
-  *
-  * @example
-  *          {{{
-  *            val trainingDS: DataSet[Vector] = env.fromCollection(data)
-  *            val transformer = StandardScaler().setMean(10.0).setStd(2.0)
-  *
-  *            transformer.fit(trainingDS)
-  *            val transformedDS = transformer.transform(trainingDS)
-  *          }}}
-  *
-  * =Parameters=
-  *
-  * - [[Mean]]: The mean value of transformed data set; by default equal to 0
-  * - [[Std]]: The standard deviation of the transformed data set; by default
-  * equal to 1
-  */
-class StandardScaler extends Transformer[StandardScaler] {
-
-  private[preprocessing] var metricsOption: Option[
-      DataSet[(linalg.Vector[Double], linalg.Vector[Double])]
-    ] = None
-
-  /** Sets the target mean of the transformed data
-    *
-    * @param mu the user-specified mean value.
-    * @return the StandardScaler instance with its mean value set to the 
user-specified value
-    */
-  def setMean(mu: Double): StandardScaler = {
-    parameters.add(Mean, mu)
-    this
-  }
-
-  /** Sets the target standard deviation of the transformed data
-    *
-    * @param std the user-specified std value. In case the user gives 0.0 
value as input,
-    *            the std is set to the default value: 1.0.
-    * @return the StandardScaler instance with its std value set to the 
user-specified value
-    */
-  def setStd(std: Double): StandardScaler = {
-    if (std == 0.0) {
-      return this
-    }
-    parameters.add(Std, std)
-    this
-  }
-}
-
-object StandardScaler {
-
-  // ====================================== Parameters 
=============================================
-
-  case object Mean extends Parameter[Double] {
-    override val defaultValue: Option[Double] = Some(0.0)
-  }
-
-  case object Std extends Parameter[Double] {
-    override val defaultValue: Option[Double] = Some(1.0)
-  }
-
-  // ==================================== Factory methods 
==========================================
-
-  def apply(): StandardScaler = {
-    new StandardScaler()
-  }
-
-  // ====================================== Operations 
=============================================
-
-  /** Trains the [[org.apache.flink.ml.preprocessing.StandardScaler]] by 
learning the mean and
-    * standard deviation of the training data. These values are used inthe 
transform step
-    * to transform the given input data.
-    *
-    * @tparam T Input data type which is a subtype of [[Vector]]
-    * @return
-    */
-  implicit def fitVectorStandardScaler[T <: Vector] = new 
FitOperation[StandardScaler, T] {
-    override def fit(instance: StandardScaler, fitParameters: ParameterMap, 
input: DataSet[T])
-      : Unit = {
-      val metrics = extractFeatureMetrics(input)
-
-      instance.metricsOption = Some(metrics)
-    }
-  }
-
-  /** Trains the [[StandardScaler]] by learning the mean and standard 
deviation of the training
-    * data which is of type [[LabeledVector]]. The mean and standard deviation 
are used to
-    * transform the given input data.
-    *
-    */
-  implicit val fitLabeledVectorStandardScaler = {
-    new FitOperation[StandardScaler, LabeledVector] {
-      override def fit(
-          instance: StandardScaler,
-          fitParameters: ParameterMap,
-          input: DataSet[LabeledVector])
-        : Unit = {
-        val vectorDS = input.map(_.vector)
-        val metrics = extractFeatureMetrics(vectorDS)
-
-        instance.metricsOption = Some(metrics)
-      }
-    }
-  }
-
-  /** Trains the [[StandardScaler]] by learning the mean and standard 
deviation of the training
-    * data which is of type ([[Vector]], Double). The mean and standard 
deviation are used to
-    * transform the given input data.
-    *
-    */
-  implicit def fitLabelVectorTupleStandardScaler
-  [T <: Vector: BreezeVectorConverter: TypeInformation: ClassTag] = {
-    new FitOperation[StandardScaler, (T, Double)] {
-      override def fit(
-          instance: StandardScaler,
-          fitParameters: ParameterMap,
-          input: DataSet[(T, Double)])
-      : Unit = {
-        val vectorDS = input.map(_._1)
-        val metrics = extractFeatureMetrics(vectorDS)
-
-        instance.metricsOption = Some(metrics)
-      }
-    }
-  }
-
-  /** Calculates in one pass over the data the features' mean and standard 
deviation.
-    * For the calculation of the Standard deviation with one pass over the 
data,
-    * the Youngs & Cramer algorithm was used:
-    * [[http://www.cs.yale.edu/publications/techreports/tr222.pdf]]
-    *
-    *
-    * @param dataSet The data set for which we want to calculate mean and 
variance
-    * @return  DataSet containing a single tuple of two vectors (meanVector, 
stdVector).
-    *          The first vector represents the mean vector and the second is 
the standard
-    *          deviation vector.
-    */
-  private def extractFeatureMetrics[T <: Vector](dataSet: DataSet[T])
-  : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = {
-    val metrics = dataSet.map{
-      v => (1.0, v.asBreeze, linalg.Vector.zeros[Double](v.size))
-    }.reduce{
-      (metrics1, metrics2) => {
-        /* We use formula 1.5b of the cited technical report for the 
combination of partial
-           * sum of squares. According to 1.5b:
-           * val temp1 : m/n(m+n)
-           * val temp2 : n/m
-           */
-        val temp1 = metrics1._1 / (metrics2._1 * (metrics1._1 + metrics2._1))
-        val temp2 = metrics2._1 / metrics1._1
-        val tempVector = (metrics1._2 * temp2) - metrics2._2
-        val tempS = (metrics1._3 + metrics2._3) + (tempVector :* tempVector) * 
temp1
-
-        (metrics1._1 + metrics2._1, metrics1._2 + metrics2._2, tempS)
-      }
-    }.map{
-      metric => {
-        val varianceVector = sqrt(metric._3 / metric._1)
-
-        for (i <- 0 until varianceVector.size) {
-          if (varianceVector(i) == 0.0) {
-            varianceVector.update(i, 1.0)
-          }
-        }
-        (metric._2 / metric._1, varianceVector)
-      }
-    }
-    metrics
-  }
-
-  /** Base class for StandardScaler's [[TransformOperation]]. This class has 
to be extended for
-    * all types which are supported by [[StandardScaler]]'s transform 
operation.
-    *
-    * @tparam T
-    */
-  abstract class StandardScalerTransformOperation[T: TypeInformation: ClassTag]
-    extends TransformOperation[
-        StandardScaler,
-        (linalg.Vector[Double], linalg.Vector[Double]),
-        T,
-        T] {
-
-    var mean: Double = _
-    var std: Double = _
-
-    override def getModel(
-      instance: StandardScaler,
-      transformParameters: ParameterMap)
-    : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = {
-      mean = transformParameters(Mean)
-      std = transformParameters(Std)
-
-      instance.metricsOption match {
-        case Some(metrics) => metrics
-        case None =>
-          throw new RuntimeException("The StandardScaler has not been fitted 
to the data. " +
-            "This is necessary to estimate the mean and standard deviation of 
the data.")
-      }
-    }
-
-    def scale[V <: Vector: BreezeVectorConverter](
-      vector: V,
-      model: (linalg.Vector[Double], linalg.Vector[Double]))
-    : V = {
-      val (broadcastMean, broadcastStd) = model
-      var myVector = vector.asBreeze
-      myVector -= broadcastMean
-      myVector :/= broadcastStd
-      myVector = (myVector :* std) + mean
-      myVector.fromBreeze
-    }
-  }
-
-  /** [[TransformOperation]] to transform [[Vector]] types
-    *
-    * @tparam T
-    * @return
-    */
-  implicit def transformVectors[T <: Vector: BreezeVectorConverter: 
TypeInformation: ClassTag] = {
-    new StandardScalerTransformOperation[T]() {
-      override def transform(
-          vector: T,
-          model: (linalg.Vector[Double], linalg.Vector[Double]))
-        : T = {
-        scale(vector, model)
-      }
-    }
-  }
-
-  /** [[TransformOperation]] to transform tuples of type ([[Vector]], 
[[Double]]).
-    *
-    * @tparam T
-    * @return
-    */
-  implicit def transformTupleVectorDouble[
-      T <: Vector: BreezeVectorConverter: TypeInformation: ClassTag] = {
-    new StandardScalerTransformOperation[(T, Double)] {
-      override def transform(
-          element: (T, Double),
-          model: (linalg.Vector[Double], linalg.Vector[Double]))
-        : (T, Double) = {
-        (scale(element._1, model), element._2)
-      }
-    }
-  }
-
-  /** [[TransformOperation]] to transform [[LabeledVector]].
-    *
-    */
-  implicit val transformLabeledVector = new 
StandardScalerTransformOperation[LabeledVector] {
-    override def transform(
-        element: LabeledVector,
-        model: (linalg.Vector[Double], linalg.Vector[Double]))
-      : LabeledVector = {
-      val LabeledVector(label, vector) = element
-
-      LabeledVector(label, scale(vector, model))
-    }
-  }
-}

Reply via email to