Repository: flink
Updated Branches:
  refs/heads/master d92e422ec -> 62938c110


[FLINK-2342] [ml] Add a new fit operation for Vector with Double value to 
StandardScaler

  - Add more tests to `StandardScalerITSuite`
  - Refactor `StandardScalerITSuite`

This closes #899.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62938c11
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62938c11
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62938c11

Branch: refs/heads/master
Commit: 62938c11016679b654506a0da67807356f64be60
Parents: d92e422
Author: Theodore Vasiloudis <t...@sics.se>
Authored: Fri Jul 10 11:34:32 2015 +0200
Committer: Chiwan Park <chiwanp...@apache.org>
Committed: Thu Jan 14 18:14:38 2016 +0900

----------------------------------------------------------------------
 .../flink/ml/preprocessing/StandardScaler.scala | 50 ++++++--------
 .../preprocessing/StandardScalerITSuite.scala   | 73 +++++++++++---------
 2 files changed, 64 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/62938c11/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
index c62657f..82e8abf 100644
--- 
a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
+++ 
b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
@@ -145,6 +145,27 @@ object StandardScaler {
     }
   }
 
+  /** Trains the [[StandardScaler]] by learning the mean and standard 
deviation of the training
+    * data which is of type ([[Vector]], Double). The mean and standard 
deviation are used to
+    * transform the given input data.
+    *
+    */
+  implicit def fitLabelVectorTupleStandardScaler
+  [T <: Vector: BreezeVectorConverter: TypeInformation: ClassTag] = {
+    new FitOperation[StandardScaler, (T, Double)] {
+      override def fit(
+          instance: StandardScaler,
+          fitParameters: ParameterMap,
+          input: DataSet[(T, Double)])
+      : Unit = {
+        val vectorDS = input.map(_._1)
+        val metrics = extractFeatureMetrics(vectorDS)
+
+        instance.metricsOption = Some(metrics)
+      }
+    }
+  }
+
   /** Calculates in one pass over the data the features' mean and standard 
deviation.
     * For the calculation of the Standard deviation with one pass over the 
data,
     * the Youngs & Cramer algorithm was used:
@@ -240,8 +261,8 @@ object StandardScaler {
   implicit def transformVectors[T <: Vector: BreezeVectorConverter: 
TypeInformation: ClassTag] = {
     new StandardScalerTransformOperation[T]() {
       override def transform(
-            vector: T,
-            model: (linalg.Vector[Double], linalg.Vector[Double]))
+          vector: T,
+          model: (linalg.Vector[Double], linalg.Vector[Double]))
         : T = {
         scale(vector, model)
       }
@@ -278,29 +299,4 @@ object StandardScaler {
       LabeledVector(label, scale(vector, model))
     }
   }
-
-  /** Scales the given vector such that it has the given mean and std
-    *
-    * @param vector Vector to be scaled
-    * @param dataMean Mean of the training data
-    * @param dataStd Standard deviation of the training data
-    * @param mean Mean of the scaled data
-    * @param std Standard deviation of the scaled data
-    * @tparam T Type of [[Vector]]
-    * @return Scaled vector
-    */
-  private def scaleVector[T <: Vector: BreezeVectorConverter](
-      vector: T,
-      dataMean: linalg.Vector[Double],
-      dataStd: linalg.Vector[Double],
-      mean: Double,
-      std: Double)
-    : T = {
-    var myVector = vector.asBreeze
-
-    myVector -= dataMean
-    myVector :/= dataStd
-    myVector = (myVector :* std) + mean
-    myVector.fromBreeze
-  }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/62938c11/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
----------------------------------------------------------------------
diff --git 
a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
 
b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
index 30875b3..5cd253d 100644
--- 
a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
+++ 
b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala
@@ -21,7 +21,8 @@ import breeze.linalg
 import breeze.numerics.sqrt
 import breeze.numerics.sqrt._
 import org.apache.flink.api.scala._
-import org.apache.flink.ml.math.{Vector, DenseVector}
+import org.apache.flink.ml.common.LabeledVector
+import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector}
 import org.apache.flink.test.util.FlinkTestBase
 import org.apache.flink.ml.math.Breeze._
 import org.scalatest._
@@ -36,15 +37,10 @@ class StandardScalerITSuite
 
   import StandardScalerData._
 
-  it should "scale the vectors to have mean equal to 0 and std equal to 1" in {
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val dataSet = env.fromCollection(data)
-    val scaler = StandardScaler()
-    scaler.fit(dataSet)
-    val scaledVectors = scaler.transform(dataSet).collect
-
+  def checkVectors(
+      scaledVectors: Seq[FlinkVector],
+      expectedMean: Double,
+      expectedStd: Double): Unit = {
     scaledVectors.length should equal(data.length)
 
     val numberOfFeatures = scaledVectors(0).size
@@ -64,11 +60,23 @@ class StandardScalerITSuite
     scaledStd = sqrt(scaledStd)
 
     for (i <- 0 until numberOfFeatures) {
-      scaledMean(i) should be(0.0 +- (0.0000000000001))
-      scaledStd(i) should be(1.0 +- (0.0000000000001))
+      scaledMean(i) should be(expectedMean +- 1e-9)
+      scaledStd(i) should be(expectedStd +- 1e-9)
     }
   }
 
+  it should "scale the vectors to have mean equal to 0 and std equal to 1" in {
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val dataSet = env.fromCollection(data)
+    val scaler = StandardScaler()
+    scaler.fit(dataSet)
+    val scaledVectors = scaler.transform(dataSet).collect()
+
+    checkVectors(scaledVectors, 0.0, 1.0)
+  }
+
   it should "scale the vectors to have mean equal to 10 and standard deviation 
equal to 2" in {
 
     val env = ExecutionEnvironment.getExecutionEnvironment
@@ -76,37 +84,38 @@ class StandardScalerITSuite
     val dataSet = env.fromCollection(data)
     val scaler = StandardScaler().setMean(10.0).setStd(2.0)
     scaler.fit(dataSet)
-    val scaledVectors = scaler.transform(dataSet).collect
+    val scaledVectors = scaler.transform(dataSet).collect()
 
-    scaledVectors.length should equal(data.length)
+    checkVectors(scaledVectors, 10.0, 2.0)
+  }
 
-    val numberOfFeatures = scaledVectors(0).size
-    var scaledMean: linalg.Vector[Double] = 
linalg.DenseVector.zeros(numberOfFeatures)
-    var scaledStd: linalg.Vector[Double] = 
linalg.DenseVector.zeros(numberOfFeatures)
+  it should "work with LabeledVector" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
 
-    for (vector <- scaledVectors) {
-      scaledMean += vector.asBreeze
-    }
+    val dataSet = env.fromCollection(data).map(v => LabeledVector(1.0, v))
+    val scaler = StandardScaler()
+    scaler.fit(dataSet)
+    val scaledVectors = scaler.transform(dataSet).map(lv => 
lv.vector).collect()
 
-    scaledMean /= scaledVectors.size.asInstanceOf[Double]
+    checkVectors(scaledVectors, 0.0, 1.0)
+  }
 
-    for (vector <- scaledVectors) {
-      val temp = vector.asBreeze - scaledMean
-      scaledStd += temp :* temp
-    }
-    scaledStd /= scaledVectors.size.asInstanceOf[Double]
-    scaledStd = sqrt(scaledStd)
+  it should "work with (FlinkVector, Double) tuples" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
 
-    for (i <- 0 until numberOfFeatures) {
-      scaledMean(i) should be(10.0 +- (0.0000000000001))
-      scaledStd(i) should be(2.0 +- (0.0000000000001))
-    }
+    val dataSet = env.fromCollection(data).map(v => (v, 1.0))
+    val scaler = StandardScaler()
+    scaler.fit(dataSet)
+    val scaledVectors = scaler.transform(dataSet).map(vl => vl._1).collect()
+
+    checkVectors(scaledVectors, 0.0, 1.0)
   }
 }
 
 object StandardScalerData {
 
-  val data: Seq[Vector] = List(DenseVector(Array(2104.00, 3.00)),
+  val data: Seq[FlinkVector] = List(
+    DenseVector(Array(2104.00, 3.00)),
     DenseVector(Array(1600.00, 3.00)),
     DenseVector(Array(2400.00, 3.00)),
     DenseVector(Array(1416.00, 2.00)),

Reply via email to