Repository: flink Updated Branches: refs/heads/master d92e422ec -> 62938c110
[FLINK-2342] [ml] Add a new fit operation for Vector with Double value to StandardScaler - Add more tests to `StandardScalerITSuite` - Refactor `StandardScalerITSuite` This closes #899. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62938c11 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62938c11 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62938c11 Branch: refs/heads/master Commit: 62938c11016679b654506a0da67807356f64be60 Parents: d92e422 Author: Theodore Vasiloudis <t...@sics.se> Authored: Fri Jul 10 11:34:32 2015 +0200 Committer: Chiwan Park <chiwanp...@apache.org> Committed: Thu Jan 14 18:14:38 2016 +0900 ---------------------------------------------------------------------- .../flink/ml/preprocessing/StandardScaler.scala | 50 ++++++-------- .../preprocessing/StandardScalerITSuite.scala | 73 +++++++++++--------- 2 files changed, 64 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/62938c11/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala index c62657f..82e8abf 100644 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala +++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala @@ -145,6 +145,27 @@ object StandardScaler { } } + /** Trains the [[StandardScaler]] by learning the mean and standard deviation of the training + * data which is of type ([[Vector]], Double). The mean and standard deviation are used to + * transform the given input data. + * + */ + implicit def fitLabelVectorTupleStandardScaler + [T <: Vector: BreezeVectorConverter: TypeInformation: ClassTag] = { + new FitOperation[StandardScaler, (T, Double)] { + override def fit( + instance: StandardScaler, + fitParameters: ParameterMap, + input: DataSet[(T, Double)]) + : Unit = { + val vectorDS = input.map(_._1) + val metrics = extractFeatureMetrics(vectorDS) + + instance.metricsOption = Some(metrics) + } + } + } + /** Calculates in one pass over the data the features' mean and standard deviation. * For the calculation of the Standard deviation with one pass over the data, * the Youngs & Cramer algorithm was used: @@ -240,8 +261,8 @@ object StandardScaler { implicit def transformVectors[T <: Vector: BreezeVectorConverter: TypeInformation: ClassTag] = { new StandardScalerTransformOperation[T]() { override def transform( - vector: T, - model: (linalg.Vector[Double], linalg.Vector[Double])) + vector: T, + model: (linalg.Vector[Double], linalg.Vector[Double])) : T = { scale(vector, model) } @@ -278,29 +299,4 @@ object StandardScaler { LabeledVector(label, scale(vector, model)) } } - - /** Scales the given vector such that it has the given mean and std - * - * @param vector Vector to be scaled - * @param dataMean Mean of the training data - * @param dataStd Standard deviation of the training data - * @param mean Mean of the scaled data - * @param std Standard deviation of the scaled data - * @tparam T Type of [[Vector]] - * @return Scaled vector - */ - private def scaleVector[T <: Vector: BreezeVectorConverter]( - vector: T, - dataMean: linalg.Vector[Double], - dataStd: linalg.Vector[Double], - mean: Double, - std: Double) - : T = { - var myVector = vector.asBreeze - - myVector -= dataMean - myVector :/= dataStd - myVector = (myVector :* std) + mean - myVector.fromBreeze - } } http://git-wip-us.apache.org/repos/asf/flink/blob/62938c11/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala index 30875b3..5cd253d 100644 --- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala +++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/preprocessing/StandardScalerITSuite.scala @@ -21,7 +21,8 @@ import breeze.linalg import breeze.numerics.sqrt import breeze.numerics.sqrt._ import org.apache.flink.api.scala._ -import org.apache.flink.ml.math.{Vector, DenseVector} +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.math.{Vector => FlinkVector, DenseVector} import org.apache.flink.test.util.FlinkTestBase import org.apache.flink.ml.math.Breeze._ import org.scalatest._ @@ -36,15 +37,10 @@ class StandardScalerITSuite import StandardScalerData._ - it should "scale the vectors to have mean equal to 0 and std equal to 1" in { - - val env = ExecutionEnvironment.getExecutionEnvironment - - val dataSet = env.fromCollection(data) - val scaler = StandardScaler() - scaler.fit(dataSet) - val scaledVectors = scaler.transform(dataSet).collect - + def checkVectors( + scaledVectors: Seq[FlinkVector], + expectedMean: Double, + expectedStd: Double): Unit = { scaledVectors.length should equal(data.length) val numberOfFeatures = scaledVectors(0).size @@ -64,11 +60,23 @@ class StandardScalerITSuite scaledStd = sqrt(scaledStd) for (i <- 0 until numberOfFeatures) { - scaledMean(i) should be(0.0 +- (0.0000000000001)) - scaledStd(i) should be(1.0 +- (0.0000000000001)) + scaledMean(i) should be(expectedMean +- 1e-9) + scaledStd(i) should be(expectedStd +- 1e-9) } } + it should "scale the vectors to have mean equal to 0 and std equal to 1" in { + + val env = ExecutionEnvironment.getExecutionEnvironment + + val dataSet = env.fromCollection(data) + val scaler = StandardScaler() + scaler.fit(dataSet) + val scaledVectors = scaler.transform(dataSet).collect() + + checkVectors(scaledVectors, 0.0, 1.0) + } + it should "scale the vectors to have mean equal to 10 and standard deviation equal to 2" in { val env = ExecutionEnvironment.getExecutionEnvironment @@ -76,37 +84,38 @@ class StandardScalerITSuite val dataSet = env.fromCollection(data) val scaler = StandardScaler().setMean(10.0).setStd(2.0) scaler.fit(dataSet) - val scaledVectors = scaler.transform(dataSet).collect + val scaledVectors = scaler.transform(dataSet).collect() - scaledVectors.length should equal(data.length) + checkVectors(scaledVectors, 10.0, 2.0) + } - val numberOfFeatures = scaledVectors(0).size - var scaledMean: linalg.Vector[Double] = linalg.DenseVector.zeros(numberOfFeatures) - var scaledStd: linalg.Vector[Double] = linalg.DenseVector.zeros(numberOfFeatures) + it should "work with LabeledVector" in { + val env = ExecutionEnvironment.getExecutionEnvironment - for (vector <- scaledVectors) { - scaledMean += vector.asBreeze - } + val dataSet = env.fromCollection(data).map(v => LabeledVector(1.0, v)) + val scaler = StandardScaler() + scaler.fit(dataSet) + val scaledVectors = scaler.transform(dataSet).map(lv => lv.vector).collect() - scaledMean /= scaledVectors.size.asInstanceOf[Double] + checkVectors(scaledVectors, 0.0, 1.0) + } - for (vector <- scaledVectors) { - val temp = vector.asBreeze - scaledMean - scaledStd += temp :* temp - } - scaledStd /= scaledVectors.size.asInstanceOf[Double] - scaledStd = sqrt(scaledStd) + it should "work with (FlinkVector, Double) tuples" in { + val env = ExecutionEnvironment.getExecutionEnvironment - for (i <- 0 until numberOfFeatures) { - scaledMean(i) should be(10.0 +- (0.0000000000001)) - scaledStd(i) should be(2.0 +- (0.0000000000001)) - } + val dataSet = env.fromCollection(data).map(v => (v, 1.0)) + val scaler = StandardScaler() + scaler.fit(dataSet) + val scaledVectors = scaler.transform(dataSet).map(vl => vl._1).collect() + + checkVectors(scaledVectors, 0.0, 1.0) } } object StandardScalerData { - val data: Seq[Vector] = List(DenseVector(Array(2104.00, 3.00)), + val data: Seq[FlinkVector] = List( + DenseVector(Array(2104.00, 3.00)), DenseVector(Array(1600.00, 3.00)), DenseVector(Array(2400.00, 3.00)), DenseVector(Array(1416.00, 2.00)),