http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala deleted file mode 100644 index 39a031f..0000000 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.optimization - -import org.apache.flink.api.scala.DataSet -import org.apache.flink.ml.common._ -import org.apache.flink.ml.math.{SparseVector, DenseVector} -import org.apache.flink.api.scala._ -import org.apache.flink.ml.optimization.IterativeSolver._ - -/** Base class for optimization algorithms - * - */ -abstract class Solver extends Serializable with WithParameters { - import Solver._ - - /** Provides a solution for the given optimization problem - * - * @param data A Dataset of LabeledVector (input, output) pairs - * @param initialWeights The initial weight that will be optimized - * @return A Vector of weights optimized to the given problem - */ - def optimize( - data: DataSet[LabeledVector], - initialWeights: Option[DataSet[WeightVector]]) - : DataSet[WeightVector] - - /** Creates initial weights vector, creating a DataSet with a WeightVector element - * - * @param initialWeights An Option that may contain an initial set of weights - * @param data The data for which we optimize the weights - * @return A DataSet containing a single WeightVector element - */ - def createInitialWeightsDS(initialWeights: Option[DataSet[WeightVector]], - data: DataSet[LabeledVector]): DataSet[WeightVector] = { - // TODO: Faster way to do this? - val dimensionsDS = data.map(_.vector.size).reduce((a, b) => b) - - initialWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) => - wvDS.map { - wv => { - val denseWeights = wv.weights match { - case dv: DenseVector => dv - case sv: SparseVector => sv.toDenseVector - } - WeightVector(denseWeights, wv.intercept) - } - } - case None => createInitialWeightVector(dimensionsDS) - } - } - - /** Creates a DataSet with one zero vector. The zero vector has dimension d, which is given - * by the dimensionDS. - * - * @param dimensionDS DataSet with one element d, denoting the dimension of the returned zero - * vector - * @return DataSet of a zero vector of dimension d - */ - def createInitialWeightVector(dimensionDS: DataSet[Int]): DataSet[WeightVector] = { - dimensionDS.map { - dimension => - val values = Array.fill(dimension)(0.0) - new WeightVector(DenseVector(values), 0.0) - } - } - - //Setters for parameters - // TODO(tvas): Provide an option to fit an intercept or not - def setLossFunction(lossFunction: LossFunction): this.type = { - parameters.add(LossFunction, lossFunction) - this - } - - def setRegularizationConstant(regularizationConstant: Double): this.type = { - parameters.add(RegularizationConstant, regularizationConstant) - this - } -} - -object Solver { - // Define parameters for Solver - case object LossFunction extends Parameter[LossFunction] { - // TODO(tvas): Should depend on problem, here is where differentiating between classification - // and regression could become useful - val defaultValue = None - } - - case object RegularizationConstant extends Parameter[Double] { - val defaultValue = Some(0.0) // TODO(tvas): Properly initialize this, ensure Parameter > 0! - } -} - -/** An abstract class for iterative optimization algorithms - * - * See [[https://en.wikipedia.org/wiki/Iterative_method Iterative Methods on Wikipedia]] for more - * info - */ -abstract class IterativeSolver() extends Solver { - - //Setters for parameters - def setIterations(iterations: Int): this.type = { - parameters.add(Iterations, iterations) - this - } - - def setStepsize(stepsize: Double): this.type = { - parameters.add(LearningRate, stepsize) - this - } - - def setConvergenceThreshold(convergenceThreshold: Double): this.type = { - parameters.add(ConvergenceThreshold, convergenceThreshold) - this - } -} - -object IterativeSolver { - - val MAX_DLOSS: Double = 1e12 - - // Define parameters for IterativeSolver - case object LearningRate extends Parameter[Double] { - val defaultValue = Some(0.1) - } - - case object Iterations extends Parameter[Int] { - val defaultValue = Some(10) - } - - case object ConvergenceThreshold extends Parameter[Double] { - val defaultValue = None - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala deleted file mode 100644 index 554e155..0000000 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/package.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink - -import org.apache.flink.api.common.functions.{RichFilterFunction, RichMapFunction} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.java.operators.DataSink -import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} -import org.apache.flink.configuration.Configuration -import org.apache.flink.ml.common.LabeledVector - -import scala.reflect.ClassTag - -package object ml { - - /** Pimp my [[ExecutionEnvironment]] to directly support `readLibSVM` - * - * @param executionEnvironment - */ - implicit class RichExecutionEnvironment(executionEnvironment: ExecutionEnvironment) { - def readLibSVM(path: String): DataSet[LabeledVector] = { - MLUtils.readLibSVM(executionEnvironment, path) - } - } - - /** Pimp my [[DataSet]] to directly support `writeAsLibSVM` - * - * @param dataSet - */ - implicit class RichLabeledDataSet(dataSet: DataSet[LabeledVector]) { - def writeAsLibSVM(path: String): DataSink[String] = { - MLUtils.writeLibSVM(path, dataSet) - } - } - - implicit class RichDataSet[T](dataSet: DataSet[T]) { - def mapWithBcVariable[B, O: TypeInformation: ClassTag]( - broadcastVariable: DataSet[B])( - fun: (T, B) => O) - : DataSet[O] = { - dataSet.map(new BroadcastSingleElementMapper[T, B, O](dataSet.clean(fun))) - .withBroadcastSet(broadcastVariable, "broadcastVariable") - } - - def filterWithBcVariable[B, O](broadcastVariable: DataSet[B])(fun: (T, B) => Boolean) - : DataSet[T] = { - dataSet.filter(new BroadcastSingleElementFilter[T, B](dataSet.clean(fun))) - .withBroadcastSet(broadcastVariable, "broadcastVariable") - } - - def mapWithBcVariableIteration[B, O: TypeInformation: ClassTag]( - broadcastVariable: DataSet[B])(fun: (T, B, Int) => O) - : DataSet[O] = { - dataSet.map(new BroadcastSingleElementMapperWithIteration[T, B, O](dataSet.clean(fun))) - .withBroadcastSet(broadcastVariable, "broadcastVariable") - } - } - - private class BroadcastSingleElementMapper[T, B, O]( - fun: (T, B) => O) - extends RichMapFunction[T, O] { - var broadcastVariable: B = _ - - @throws(classOf[Exception]) - override def open(configuration: Configuration): Unit = { - broadcastVariable = getRuntimeContext.getBroadcastVariable[B]("broadcastVariable").get(0) - } - - override def map(value: T): O = { - fun(value, broadcastVariable) - } - } - - private class BroadcastSingleElementMapperWithIteration[T, B, O]( - fun: (T, B, Int) => O) - extends RichMapFunction[T, O] { - var broadcastVariable: B = _ - - @throws(classOf[Exception]) - override def open(configuration: Configuration): Unit = { - broadcastVariable = getRuntimeContext.getBroadcastVariable[B]("broadcastVariable").get(0) - } - - override def map(value: T): O = { - fun(value, broadcastVariable, getIterationRuntimeContext.getSuperstepNumber) - } - } - - private class BroadcastSingleElementFilter[T, B]( - fun: (T, B) => Boolean) - extends RichFilterFunction[T] { - var broadcastVariable: B = _ - - @throws(classOf[Exception]) - override def open(configuration: Configuration): Unit = { - broadcastVariable = getRuntimeContext.getBroadcastVariable[B]("broadcastVariable").get(0) - } - - override def filter(value: T): Boolean = { - fun(value, broadcastVariable) - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala deleted file mode 100644 index bf5a8b2..0000000 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedPredictor.scala +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.pipeline - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala.DataSet -import org.apache.flink.ml.common.ParameterMap - -/** [[Predictor]] which represents a pipeline of possibly multiple [[Transformer]] and a trailing - * [[Predictor]]. - * - * The [[ChainedPredictor]] can be used as a regular [[Predictor]]. Upon calling the fit method, - * the input data is piped through all preceding [[Transformer]] in the pipeline and the resulting - * data is given to the trailing [[Predictor]]. The same holds true for the predict operation. - * - * The pipeline mechanism has been inspired by scikit-learn - * - * @param transformer Preceding [[Transformer]] of the pipeline - * @param predictor Trailing [[Predictor]] of the pipeline - * @tparam T Type of the preceding [[Transformer]] - * @tparam P Type of the trailing [[Predictor]] - */ -case class ChainedPredictor[T <: Transformer[T], P <: Predictor[P]](transformer: T, predictor: P) - extends Predictor[ChainedPredictor[T, P]]{} - -object ChainedPredictor{ - - /** [[PredictDataSetOperation]] for the [[ChainedPredictor]]. - * - * The [[PredictDataSetOperation]] requires the [[TransformDataSetOperation]] of the preceding - * [[Transformer]] and the [[PredictDataSetOperation]] of the trailing [[Predictor]]. Upon - * calling predict, the testing data is first transformed by the preceding [[Transformer]] and - * the result is then used to calculate the prediction via the trailing [[Predictor]]. - * - * @param transformOperation [[TransformDataSetOperation]] for the preceding [[Transformer]] - * @param predictOperation [[PredictDataSetOperation]] for the trailing [[Predictor]] - * @tparam T Type of the preceding [[Transformer]] - * @tparam P Type of the trailing [[Predictor]] - * @tparam Testing Type of the testing data - * @tparam Intermediate Type of the intermediate data produced by the preceding [[Transformer]] - * @tparam Prediction Type of the predicted data generated by the trailing [[Predictor]] - * @return - */ - implicit def chainedPredictOperation[ - T <: Transformer[T], - P <: Predictor[P], - Testing, - Intermediate, - Prediction]( - implicit transformOperation: TransformDataSetOperation[T, Testing, Intermediate], - predictOperation: PredictDataSetOperation[P, Intermediate, Prediction]) - : PredictDataSetOperation[ChainedPredictor[T, P], Testing, Prediction] = { - - new PredictDataSetOperation[ChainedPredictor[T, P], Testing, Prediction] { - override def predictDataSet( - instance: ChainedPredictor[T, P], - predictParameters: ParameterMap, - input: DataSet[Testing]) - : DataSet[Prediction] = { - - val testing = instance.transformer.transform(input, predictParameters) - instance.predictor.predict(testing, predictParameters) - } - } - } - - /** [[FitOperation]] for the [[ChainedPredictor]]. - * - * The [[FitOperation]] requires the [[FitOperation]] and the [[TransformDataSetOperation]] of - * the preceding [[Transformer]] as well as the [[FitOperation]] of the trailing [[Predictor]]. - * Upon calling fit, the preceding [[Transformer]] is first fitted to the training data. - * The training data is then transformed by the fitted [[Transformer]]. The transformed data - * is then used to fit the [[Predictor]]. - * - * @param fitOperation [[FitOperation]] of the preceding [[Transformer]] - * @param transformOperation [[TransformDataSetOperation]] of the preceding [[Transformer]] - * @param predictorFitOperation [[PredictDataSetOperation]] of the trailing [[Predictor]] - * @tparam L Type of the preceding [[Transformer]] - * @tparam R Type of the trailing [[Predictor]] - * @tparam I Type of the training data - * @tparam T Type of the intermediate data - * @return - */ - implicit def chainedFitOperation[L <: Transformer[L], R <: Predictor[R], I, T](implicit - fitOperation: FitOperation[L, I], - transformOperation: TransformDataSetOperation[L, I, T], - predictorFitOperation: FitOperation[R, T]): FitOperation[ChainedPredictor[L, R], I] = { - new FitOperation[ChainedPredictor[L, R], I] { - override def fit( - instance: ChainedPredictor[L, R], - fitParameters: ParameterMap, - input: DataSet[I]) - : Unit = { - instance.transformer.fit(input, fitParameters) - val intermediateResult = instance.transformer.transform(input, fitParameters) - instance.predictor.fit(intermediateResult, fitParameters) - } - } - } - - implicit def chainedEvaluationOperation[ - T <: Transformer[T], - P <: Predictor[P], - Testing, - Intermediate, - PredictionValue]( - implicit transformOperation: TransformDataSetOperation[T, Testing, Intermediate], - evaluateOperation: EvaluateDataSetOperation[P, Intermediate, PredictionValue], - testingTypeInformation: TypeInformation[Testing], - predictionValueTypeInformation: TypeInformation[PredictionValue]) - : EvaluateDataSetOperation[ChainedPredictor[T, P], Testing, PredictionValue] = { - new EvaluateDataSetOperation[ChainedPredictor[T, P], Testing, PredictionValue] { - override def evaluateDataSet( - instance: ChainedPredictor[T, P], - evaluateParameters: ParameterMap, - testing: DataSet[Testing]) - : DataSet[(PredictionValue, PredictionValue)] = { - val intermediate = instance.transformer.transform(testing, evaluateParameters) - instance.predictor.evaluate(intermediate, evaluateParameters) - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala deleted file mode 100644 index bdf917d..0000000 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/ChainedTransformer.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.pipeline - -import org.apache.flink.api.scala.DataSet -import org.apache.flink.ml.common.ParameterMap - -/** [[Transformer]] which represents the chaining of two [[Transformer]]. - * - * A [[ChainedTransformer]] can be treated as regular [[Transformer]]. Upon calling the fit or - * transform operation, the data is piped through all [[Transformer]] of the pipeline. - * - * The pipeline mechanism has been inspired by scikit-learn - * - * @param left Left [[Transformer]] of the pipeline - * @param right Right [[Transformer]] of the pipeline - * @tparam L Type of the left [[Transformer]] - * @tparam R Type of the right [[Transformer]] - */ -case class ChainedTransformer[L <: Transformer[L], R <: Transformer[R]](left: L, right: R) - extends Transformer[ChainedTransformer[L, R]] { -} - -object ChainedTransformer{ - - /** [[TransformDataSetOperation]] implementation for [[ChainedTransformer]]. - * - * First the transform operation of the left [[Transformer]] is called with the input data. This - * generates intermediate data which is fed to the right [[Transformer]]'s transform operation. - * - * @param transformOpLeft [[TransformDataSetOperation]] for the left [[Transformer]] - * @param transformOpRight [[TransformDataSetOperation]] for the right [[Transformer]] - * @tparam L Type of the left [[Transformer]] - * @tparam R Type of the right [[Transformer]] - * @tparam I Type of the input data - * @tparam T Type of the intermediate output data - * @tparam O Type of the output data - * @return - */ - implicit def chainedTransformOperation[ - L <: Transformer[L], - R <: Transformer[R], - I, - T, - O](implicit - transformOpLeft: TransformDataSetOperation[L, I, T], - transformOpRight: TransformDataSetOperation[R, T, O]) - : TransformDataSetOperation[ChainedTransformer[L,R], I, O] = { - - new TransformDataSetOperation[ChainedTransformer[L, R], I, O] { - override def transformDataSet( - chain: ChainedTransformer[L, R], - transformParameters: ParameterMap, - input: DataSet[I]): DataSet[O] = { - val intermediateResult = transformOpLeft.transformDataSet( - chain.left, - transformParameters, - input) - transformOpRight.transformDataSet(chain.right, transformParameters, intermediateResult) - } - } - } - - /** [[FitOperation]] implementation for [[ChainedTransformer]]. - * - * First the fit operation of the left [[Transformer]] is called with the input data. Then - * the data is transformed by this [[Transformer]] and the given to the fit operation of the - * right [[Transformer]]. - * - * @param leftFitOperation [[FitOperation]] for the left [[Transformer]] - * @param leftTransformOperation [[TransformDataSetOperation]] for the left [[Transformer]] - * @param rightFitOperation [[FitOperation]] for the right [[Transformer]] - * @tparam L Type of the left [[Transformer]] - * @tparam R Type of the right [[Transformer]] - * @tparam I Type of the input data - * @tparam T Type of the intermediate output data - * @return - */ - implicit def chainedFitOperation[L <: Transformer[L], R <: Transformer[R], I, T](implicit - leftFitOperation: FitOperation[L, I], - leftTransformOperation: TransformDataSetOperation[L, I, T], - rightFitOperation: FitOperation[R, T]): FitOperation[ChainedTransformer[L, R], I] = { - new FitOperation[ChainedTransformer[L, R], I] { - override def fit( - instance: ChainedTransformer[L, R], - fitParameters: ParameterMap, - input: DataSet[I]): Unit = { - instance.left.fit(input, fitParameters) - val intermediateResult = instance.left.transform(input, fitParameters) - instance.right.fit(intermediateResult, fitParameters) - } - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala deleted file mode 100644 index dbe0782..0000000 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Estimator.scala +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.pipeline - -import scala.reflect.ClassTag -import scala.reflect.runtime.universe._ - -import org.apache.flink.api.scala.DataSet -import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} - -/** Base trait for Flink's pipeline operators. - * - * An estimator can be fitted to input data. In order to do that the implementing class has - * to provide an implementation of a [[FitOperation]] with the correct input type. In order to make - * the [[FitOperation]] retrievable by the Scala compiler, the implementation should be placed - * in the companion object of the implementing class. - * - * The pipeline mechanism has been inspired by scikit-learn - * - * @tparam Self - */ -trait Estimator[Self] extends WithParameters { - that: Self => - - /** Fits the estimator to the given input data. The fitting logic is contained in the - * [[FitOperation]]. The computed state will be stored in the implementing class. - * - * @param training Training data - * @param fitParameters Additional parameters for the [[FitOperation]] - * @param fitOperation [[FitOperation]] which encapsulates the algorithm logic - * @tparam Training Type of the training data - * @return - */ - def fit[Training]( - training: DataSet[Training], - fitParameters: ParameterMap = ParameterMap.Empty)(implicit - fitOperation: FitOperation[Self, Training]): Unit = { - FlinkMLTools.registerFlinkMLTypes(training.getExecutionEnvironment) - fitOperation.fit(this, fitParameters, training) - } -} - -object Estimator{ - - /** Fallback [[FitOperation]] type class implementation which is used if no other - * [[FitOperation]] with the right input types could be found in the scope of the implementing - * class. The fallback [[FitOperation]] makes the system fail in the pre-flight phase by - * throwing a [[RuntimeException]] which states the reason for the failure. Usually the error - * is a missing [[FitOperation]] implementation for the input types or the wrong chaining - * of pipeline operators which have incompatible input/output types. - * - * @tparam Self Type of the pipeline operator - * @tparam Training Type of training data - * @return - */ - implicit def fallbackFitOperation[ - Self: TypeTag, - Training: TypeTag] - : FitOperation[Self, Training] = { - new FitOperation[Self, Training]{ - override def fit( - instance: Self, - fitParameters: ParameterMap, - input: DataSet[Training]) - : Unit = { - val self = typeOf[Self] - val training = typeOf[Training] - - throw new RuntimeException("There is no FitOperation defined for " + self + - " which trains on a DataSet[" + training + "]") - } - } - } - - /** Fallback [[PredictDataSetOperation]] if a [[Predictor]] is called with a not supported input - * data type. The fallback [[PredictDataSetOperation]] lets the system fail with a - * [[RuntimeException]] stating which input and output data types were inferred but for which no - * [[PredictDataSetOperation]] could be found. - * - * @tparam Self Type of the [[Predictor]] - * @tparam Testing Type of the testing data - * @return - */ - implicit def fallbackPredictOperation[ - Self: TypeTag, - Testing: TypeTag] - : PredictDataSetOperation[Self, Testing, Any] = { - new PredictDataSetOperation[Self, Testing, Any] { - override def predictDataSet( - instance: Self, - predictParameters: ParameterMap, - input: DataSet[Testing]) - : DataSet[Any] = { - val self = typeOf[Self] - val testing = typeOf[Testing] - - throw new RuntimeException("There is no PredictOperation defined for " + self + - " which takes a DataSet[" + testing + "] as input.") - } - } - } - - /** Fallback [[TransformDataSetOperation]] for [[Transformer]] which do not support the input or - * output type with which they are called. This is usualy the case if pipeline operators are - * chained which have incompatible input/output types. In order to detect these failures, the - * fallback [[TransformDataSetOperation]] throws a [[RuntimeException]] with the corresponding - * input/output types. Consequently, a wrong pipeline will be detected at pre-flight phase of - * Flink and thus prior to execution time. - * - * @tparam Self Type of the [[Transformer]] for which the [[TransformDataSetOperation]] is - * defined - * @tparam IN Input data type of the [[TransformDataSetOperation]] - * @return - */ - implicit def fallbackTransformOperation[ - Self: TypeTag, - IN: TypeTag] - : TransformDataSetOperation[Self, IN, Any] = { - new TransformDataSetOperation[Self, IN, Any] { - override def transformDataSet( - instance: Self, - transformParameters: ParameterMap, - input: DataSet[IN]) - : DataSet[Any] = { - val self = typeOf[Self] - val in = typeOf[IN] - - throw new RuntimeException("There is no TransformOperation defined for " + - self + " which takes a DataSet[" + in + - "] as input.") - } - } - } - - implicit def fallbackEvaluateOperation[ - Self: TypeTag, - Testing: TypeTag] - : EvaluateDataSetOperation[Self, Testing, Any] = { - new EvaluateDataSetOperation[Self, Testing, Any] { - override def evaluateDataSet( - instance: Self, - predictParameters: ParameterMap, - input: DataSet[Testing]) - : DataSet[(Any, Any)] = { - val self = typeOf[Self] - val testing = typeOf[Testing] - - throw new RuntimeException("There is no PredictOperation defined for " + self + - " which takes a DataSet[" + testing + "] as input.") - } - } - } -} - -/** Type class for the fit operation of an [[Estimator]]. - * - * The [[FitOperation]] contains a self type parameter so that the Scala compiler looks into - * the companion object of this class to find implicit values. - * - * @tparam Self Type of the [[Estimator]] subclass for which the [[FitOperation]] is defined - * @tparam Training Type of the training data - */ -trait FitOperation[Self, Training]{ - def fit(instance: Self, fitParameters: ParameterMap, input: DataSet[Training]): Unit -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala deleted file mode 100644 index 9d11cff..0000000 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Predictor.scala +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.pipeline - -import org.apache.flink.api.common.typeinfo.TypeInformation - -import org.apache.flink.api.scala._ -import org.apache.flink.ml._ -import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} - -/** Predictor trait for Flink's pipeline operators. - * - * A [[Predictor]] calculates predictions for testing data based on the model it learned during - * the fit operation (training phase). In order to do that, the implementing class has to provide - * a [[FitOperation]] and a [[PredictDataSetOperation]] implementation for the correct types. The - * implicit values should be put into the scope of the companion object of the implementing class - * to make them retrievable for the Scala compiler. - * - * The pipeline mechanism has been inspired by scikit-learn - * - * @tparam Self Type of the implementing class - */ -trait Predictor[Self] extends Estimator[Self] with WithParameters { - that: Self => - - /** Predict testing data according the learned model. The implementing class has to provide - * a corresponding implementation of [[PredictDataSetOperation]] which contains the prediction - * logic. - * - * @param testing Testing data which shall be predicted - * @param predictParameters Additional parameters for the prediction - * @param predictor [[PredictDataSetOperation]] which encapsulates the prediction logic - * @tparam Testing Type of the testing data - * @tparam Prediction Type of the prediction data - * @return - */ - def predict[Testing, Prediction]( - testing: DataSet[Testing], - predictParameters: ParameterMap = ParameterMap.Empty)(implicit - predictor: PredictDataSetOperation[Self, Testing, Prediction]) - : DataSet[Prediction] = { - FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment) - predictor.predictDataSet(this, predictParameters, testing) - } - - /** Evaluates the testing data by computing the prediction value and returning a pair of true - * label value and prediction value. It is important that the implementation chooses a Testing - * type from which it can extract the true label value. - * - * @param testing - * @param evaluateParameters - * @param evaluator - * @tparam Testing - * @tparam PredictionValue - * @return - */ - def evaluate[Testing, PredictionValue]( - testing: DataSet[Testing], - evaluateParameters: ParameterMap = ParameterMap.Empty)(implicit - evaluator: EvaluateDataSetOperation[Self, Testing, PredictionValue]) - : DataSet[(PredictionValue, PredictionValue)] = { - FlinkMLTools.registerFlinkMLTypes(testing.getExecutionEnvironment) - evaluator.evaluateDataSet(this, evaluateParameters, testing) - } -} - -object Predictor { - - /** Default [[PredictDataSetOperation]] which takes a [[PredictOperation]] to calculate a tuple - * of testing element and its prediction value. - * - * Note: We have to put the TypeInformation implicit values for Testing and PredictionValue after - * the PredictOperation implicit parameter. Otherwise, if it's defined as a context bound, then - * the Scala compiler does not find the implicit [[PredictOperation]] value. - * - * @param predictOperation - * @param testingTypeInformation - * @param predictionValueTypeInformation - * @tparam Instance - * @tparam Model - * @tparam Testing - * @tparam PredictionValue - * @return - */ - implicit def defaultPredictDataSetOperation[ - Instance <: Estimator[Instance], - Model, - Testing, - PredictionValue]( - implicit predictOperation: PredictOperation[Instance, Model, Testing, PredictionValue], - testingTypeInformation: TypeInformation[Testing], - predictionValueTypeInformation: TypeInformation[PredictionValue]) - : PredictDataSetOperation[Instance, Testing, (Testing, PredictionValue)] = { - new PredictDataSetOperation[Instance, Testing, (Testing, PredictionValue)] { - override def predictDataSet( - instance: Instance, - predictParameters: ParameterMap, - input: DataSet[Testing]) - : DataSet[(Testing, PredictionValue)] = { - val resultingParameters = instance.parameters ++ predictParameters - - val model = predictOperation.getModel(instance, resultingParameters) - - implicit val resultTypeInformation = createTypeInformation[(Testing, PredictionValue)] - - input.mapWithBcVariable(model){ - (element, model) => { - (element, predictOperation.predict(element, model)) - } - } - } - } - } - - /** Default [[EvaluateDataSetOperation]] which takes a [[PredictOperation]] to calculate a tuple - * of true label value and predicted label value. - * - * Note: We have to put the TypeInformation implicit values for Testing and PredictionValue after - * the PredictOperation implicit parameter. Otherwise, if it's defined as a context bound, then - * the Scala compiler does not find the implicit [[PredictOperation]] value. - * - * @param predictOperation - * @param testingTypeInformation - * @param predictionValueTypeInformation - * @tparam Instance - * @tparam Model - * @tparam Testing - * @tparam PredictionValue - * @return - */ - implicit def defaultEvaluateDataSetOperation[ - Instance <: Estimator[Instance], - Model, - Testing, - PredictionValue]( - implicit predictOperation: PredictOperation[Instance, Model, Testing, PredictionValue], - testingTypeInformation: TypeInformation[Testing], - predictionValueTypeInformation: TypeInformation[PredictionValue]) - : EvaluateDataSetOperation[Instance, (Testing, PredictionValue), PredictionValue] = { - new EvaluateDataSetOperation[Instance, (Testing, PredictionValue), PredictionValue] { - override def evaluateDataSet( - instance: Instance, - evaluateParameters: ParameterMap, - testing: DataSet[(Testing, PredictionValue)]) - : DataSet[(PredictionValue, PredictionValue)] = { - val resultingParameters = instance.parameters ++ evaluateParameters - val model = predictOperation.getModel(instance, resultingParameters) - - implicit val resultTypeInformation = createTypeInformation[(Testing, PredictionValue)] - - testing.mapWithBcVariable(model){ - (element, model) => { - (element._2, predictOperation.predict(element._1, model)) - } - } - } - } - } -} - -/** Type class for the predict operation of [[Predictor]]. This predict operation works on DataSets. - * - * [[Predictor]]s either have to implement this trait or the [[PredictOperation]] trait. The - * implementation has to be made available as an implicit value or function in the scope of - * their companion objects. - * - * The first type parameter is the type of the implementing [[Predictor]] class so that the Scala - * compiler includes the companion object of this class in the search scope for the implicit - * values. - * - * @tparam Self Type of [[Predictor]] implementing class - * @tparam Testing Type of testing data - * @tparam Prediction Type of predicted data - */ -trait PredictDataSetOperation[Self, Testing, Prediction] extends Serializable{ - - /** Calculates the predictions for all elements in the [[DataSet]] input - * - * @param instance The Predictor instance that we will use to make the predictions - * @param predictParameters The parameters for the prediction - * @param input The DataSet containing the unlabeled examples - * @return - */ - def predictDataSet( - instance: Self, - predictParameters: ParameterMap, - input: DataSet[Testing]) - : DataSet[Prediction] -} - -/** Type class for predict operation. It takes an element and the model and then computes the - * prediction value for this element. - * - * It is sufficient for a [[Predictor]] to only implement this trait to support the evaluate and - * predict method. - * - * @tparam Instance The concrete type of the [[Predictor]] that we will use for predictions - * @tparam Model The representation of the predictive model for the algorithm, for example a - * Vector of weights - * @tparam Testing The type of the example that we will use to make the predictions (input) - * @tparam Prediction The type of the label that the prediction operation will produce (output) - * - */ -trait PredictOperation[Instance, Model, Testing, Prediction] extends Serializable{ - - /** Defines how to retrieve the model of the type for which this operation was defined - * - * @param instance The Predictor instance that we will use to make the predictions - * @param predictParameters The parameters for the prediction - * @return A DataSet with the model representation as its only element - */ - def getModel(instance: Instance, predictParameters: ParameterMap): DataSet[Model] - - /** Calculates the prediction for a single element given the model of the [[Predictor]]. - * - * @param value The unlabeled example on which we make the prediction - * @param model The model representation of the prediciton algorithm - * @return A label for the provided example of type [[Prediction]] - */ - def predict(value: Testing, model: Model): - Prediction -} - -/** Type class for the evaluate operation of [[Predictor]]. This evaluate operation works on - * DataSets. - * - * It takes a [[DataSet]] of some type. For each element of this [[DataSet]] the evaluate method - * computes the prediction value and returns a tuple of true label value and prediction value. - * - * @tparam Instance The concrete type of the Predictor instance that we will use to make the - * predictions - * @tparam Testing The type of the example that we will use to make the predictions (input) - * @tparam Prediction The type of the label that the prediction operation will produce (output) - * - */ -trait EvaluateDataSetOperation[Instance, Testing, Prediction] extends Serializable{ - def evaluateDataSet( - instance: Instance, - evaluateParameters: ParameterMap, - testing: DataSet[Testing]) - : DataSet[(Prediction, Prediction)] -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala deleted file mode 100644 index 014ad2b..0000000 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/pipeline/Transformer.scala +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.pipeline - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala.DataSet -import org.apache.flink.ml._ -import org.apache.flink.ml.common.{FlinkMLTools, ParameterMap, WithParameters} - -import scala.reflect.ClassTag - -/** Transformer trait for Flink's pipeline operators. - * - * A Transformer transforms a [[DataSet]] of an input type into a [[DataSet]] of an output type. - * Furthermore, a [[Transformer]] is also an [[Estimator]], because some transformations depend - * on the training data. In order to do that the implementing class has to provide a - * [[TransformDataSetOperation]] and [[FitOperation]] implementation. The Scala compiler finds - * these implicit values if it is put in the scope of the companion object of the implementing - * class. - * - * [[Transformer]] can be chained with other [[Transformer]] and [[Predictor]] to create - * pipelines. These pipelines can consist of an arbitrary number of [[Transformer]] and at most - * one trailing [[Predictor]]. - * - * The pipeline mechanism has been inspired by scikit-learn - * - * @tparam Self - */ -trait Transformer[Self <: Transformer[Self]] - extends Estimator[Self] - with WithParameters - with Serializable { - that: Self => - - /** Transform operation which transforms an input [[DataSet]] of type I into an ouptut [[DataSet]] - * of type O. The actual transform operation is implemented within the - * [[TransformDataSetOperation]]. - * - * @param input Input [[DataSet]] of type I - * @param transformParameters Additional parameters for the [[TransformDataSetOperation]] - * @param transformOperation [[TransformDataSetOperation]] which encapsulates the algorithm's - * logic - * @tparam Input Input data type - * @tparam Output Ouptut data type - * @return - */ - def transform[Input, Output]( - input: DataSet[Input], - transformParameters: ParameterMap = ParameterMap.Empty) - (implicit transformOperation: TransformDataSetOperation[Self, Input, Output]) - : DataSet[Output] = { - FlinkMLTools.registerFlinkMLTypes(input.getExecutionEnvironment) - transformOperation.transformDataSet(that, transformParameters, input) - } - - /** Chains two [[Transformer]] to form a [[ChainedTransformer]]. - * - * @param transformer Right side transformer of the resulting pipeline - * @tparam T Type of the [[Transformer]] - * @return - */ - def chainTransformer[T <: Transformer[T]](transformer: T): ChainedTransformer[Self, T] = { - ChainedTransformer(this, transformer) - } - - /** Chains a [[Transformer]] with a [[Predictor]] to form a [[ChainedPredictor]]. - * - * @param predictor Trailing [[Predictor]] of the resulting pipeline - * @tparam P Type of the [[Predictor]] - * @return - */ - def chainPredictor[P <: Predictor[P]](predictor: P): ChainedPredictor[Self, P] = { - ChainedPredictor(this, predictor) - } -} - -object Transformer{ - implicit def defaultTransformDataSetOperation[ - Instance <: Estimator[Instance], - Model, - Input, - Output]( - implicit transformOperation: TransformOperation[Instance, Model, Input, Output], - outputTypeInformation: TypeInformation[Output], - outputClassTag: ClassTag[Output]) - : TransformDataSetOperation[Instance, Input, Output] = { - new TransformDataSetOperation[Instance, Input, Output] { - override def transformDataSet( - instance: Instance, - transformParameters: ParameterMap, - input: DataSet[Input]) - : DataSet[Output] = { - val resultingParameters = instance.parameters ++ transformParameters - val model = transformOperation.getModel(instance, resultingParameters) - - input.mapWithBcVariable(model){ - (element, model) => transformOperation.transform(element, model) - } - } - } - } -} - -/** Type class for a transform operation of [[Transformer]]. This works on [[DataSet]] of elements. - * - * The [[TransformDataSetOperation]] contains a self type parameter so that the Scala compiler - * looks into the companion object of this class to find implicit values. - * - * @tparam Instance Type of the [[Transformer]] for which the [[TransformDataSetOperation]] is - * defined - * @tparam Input Input data type - * @tparam Output Ouptut data type - */ -trait TransformDataSetOperation[Instance, Input, Output] extends Serializable{ - def transformDataSet( - instance: Instance, - transformParameters: ParameterMap, - input: DataSet[Input]) - : DataSet[Output] -} - -/** Type class for a transform operation which works on a single element and the corresponding model - * of the [[Transformer]]. - * - * @tparam Instance - * @tparam Model - * @tparam Input - * @tparam Output - */ -trait TransformOperation[Instance, Model, Input, Output] extends Serializable{ - - /** Retrieves the model of the [[Transformer]] for which this operation has been defined. - * - * @param instance - * @param transformParemters - * @return - */ - def getModel(instance: Instance, transformParemters: ParameterMap): DataSet[Model] - - /** Transforms a single element with respect to the model associated with the respective - * [[Transformer]] - * - * @param element - * @param model - * @return - */ - def transform(element: Input, model: Model): Output -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala deleted file mode 100644 index 217e2c2..0000000 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/MinMaxScaler.scala +++ /dev/null @@ -1,265 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.flink.ml.preprocessing - -import breeze.linalg -import breeze.linalg.{max, min} -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala._ -import org.apache.flink.ml._ -import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap} -import org.apache.flink.ml.math.Breeze._ -import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} -import org.apache.flink.ml.pipeline.{TransformDataSetOperation, FitOperation, -Transformer} -import org.apache.flink.ml.preprocessing.MinMaxScaler.{Max, Min} - -import scala.reflect.ClassTag - -/** Scales observations, so that all features are in a user-specified range. - * By default for [[MinMaxScaler]] transformer range = [0,1]. - * - * This transformer takes a subtype of [[Vector]] of values and maps it to a - * scaled subtype of [[Vector]] such that each feature lies between a user-specified range. - * - * This transformer can be prepended to all [[Transformer]] and - * [[org.apache.flink.ml.pipeline.Predictor]] implementations which expect as input a subtype - * of [[Vector]] or a [[LabeledVector]]. - * - * @example - * {{{ - * val trainingDS: DataSet[Vector] = env.fromCollection(data) - * val transformer = MinMaxScaler().setMin(-1.0) - * - * transformer.fit(trainingDS) - * val transformedDS = transformer.transform(trainingDS) - * }}} - * - * =Parameters= - * - * - [[Min]]: The minimum value of the range of the transformed data set; by default equal to 0 - * - [[Max]]: The maximum value of the range of the transformed data set; by default - * equal to 1 - */ -class MinMaxScaler extends Transformer[MinMaxScaler] { - - private [preprocessing] var metricsOption: Option[ - DataSet[(linalg.Vector[Double], linalg.Vector[Double])] - ] = None - - /** Sets the minimum for the range of the transformed data - * - * @param min the user-specified minimum value. - * @return the MinMaxScaler instance with its minimum value set to the user-specified value. - */ - def setMin(min: Double): MinMaxScaler = { - parameters.add(Min, min) - this - } - - /** Sets the maximum for the range of the transformed data - * - * @param max the user-specified maximum value. - * @return the MinMaxScaler instance with its minimum value set to the user-specified value. - */ - def setMax(max: Double): MinMaxScaler = { - parameters.add(Max, max) - this - } -} - -object MinMaxScaler { - - // ====================================== Parameters ============================================= - - case object Min extends Parameter[Double] { - override val defaultValue: Option[Double] = Some(0.0) - } - - case object Max extends Parameter[Double] { - override val defaultValue: Option[Double] = Some(1.0) - } - - // ==================================== Factory methods ========================================== - - def apply(): MinMaxScaler = { - new MinMaxScaler() - } - - // ====================================== Operations ============================================= - - /** Trains the [[MinMaxScaler]] by learning the minimum and maximum of each feature of the - * training data. These values are used in the transform step to transform the given input data. - * - * @tparam T Input data type which is a subtype of [[Vector]] - * @return [[FitOperation]] training the [[MinMaxScaler]] on subtypes of [[Vector]] - */ - implicit def fitVectorMinMaxScaler[T <: Vector] = new FitOperation[MinMaxScaler, T] { - override def fit(instance: MinMaxScaler, fitParameters: ParameterMap, input: DataSet[T]) - : Unit = { - val metrics = extractFeatureMinMaxVectors(input) - - instance.metricsOption = Some(metrics) - } - } - - /** Trains the [[MinMaxScaler]] by learning the minimum and maximum of the features of the - * training data which is of type [[LabeledVector]]. The minimum and maximum are used to - * transform the given input data. - * - */ - implicit val fitLabeledVectorMinMaxScaler = { - new FitOperation[MinMaxScaler, LabeledVector] { - override def fit( - instance: MinMaxScaler, - fitParameters: ParameterMap, - input: DataSet[LabeledVector]) - : Unit = { - val vectorDS = input.map(_.vector) - val metrics = extractFeatureMinMaxVectors(vectorDS) - - instance.metricsOption = Some(metrics) - } - } - } - - /** Calculates in one pass over the data the features' minimum and maximum values. - * - * @param dataSet The data set for which we want to calculate the minimum and maximum values. - * @return DataSet containing a single tuple of two vectors (minVector, maxVector). - * The first vector represents the minimum values vector and the second is the maximum - * values vector. - */ - private def extractFeatureMinMaxVectors[T <: Vector](dataSet: DataSet[T]) - : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = { - - val minMax = dataSet.map { - v => (v.asBreeze, v.asBreeze) - }.reduce { - (minMax1, minMax2) => { - - val tempMinimum = min(minMax1._1, minMax2._1) - val tempMaximum = max(minMax1._2, minMax2._2) - - (tempMinimum, tempMaximum) - } - } - minMax - } - - /** [[TransformDataSetOperation]] which scales input data of subtype of [[Vector]] with respect to - * the calculated minimum and maximum of the training data. The minimum and maximum - * values of the resulting data is configurable. - * - * @tparam T Type of the input and output data which has to be a subtype of [[Vector]] - * @return [[TransformDataSetOperation]] scaling subtypes of [[Vector]] such that the feature - * values are in the configured range - */ - implicit def transformVectors[T <: Vector : BreezeVectorConverter : TypeInformation : ClassTag] - = { - new TransformDataSetOperation[MinMaxScaler, T, T] { - override def transformDataSet( - instance: MinMaxScaler, - transformParameters: ParameterMap, - input: DataSet[T]) - : DataSet[T] = { - - val resultingParameters = instance.parameters ++ transformParameters - val min = resultingParameters(Min) - val max = resultingParameters(Max) - - instance.metricsOption match { - case Some(metrics) => { - input.mapWithBcVariable(metrics) { - (vector, metrics) => { - val (broadcastMin, broadcastMax) = metrics - scaleVector(vector, broadcastMin, broadcastMax, min, max) - } - } - } - - case None => - throw new RuntimeException("The MinMaxScaler has not been fitted to the data. " + - "This is necessary to estimate the minimum and maximum of the data.") - } - } - } - } - - implicit val transformLabeledVectors = { - new TransformDataSetOperation[MinMaxScaler, LabeledVector, LabeledVector] { - override def transformDataSet(instance: MinMaxScaler, - transformParameters: ParameterMap, - input: DataSet[LabeledVector]): DataSet[LabeledVector] = { - val resultingParameters = instance.parameters ++ transformParameters - val min = resultingParameters(Min) - val max = resultingParameters(Max) - - instance.metricsOption match { - case Some(metrics) => { - input.mapWithBcVariable(metrics) { - (labeledVector, metrics) => { - val (broadcastMin, broadcastMax) = metrics - val LabeledVector(label, vector) = labeledVector - - LabeledVector(label, scaleVector(vector, broadcastMin, broadcastMax, min, max)) - } - } - } - - case None => - throw new RuntimeException("The MinMaxScaler has not been fitted to the data. " + - "This is necessary to estimate the minimum and maximum of the data.") - } - } - } - } - - /** Scales a vector such that it's features lie in the range [min, max] - * - * @param vector Vector to scale - * @param broadcastMin Vector containing for each feature the minimal value in the training set - * @param broadcastMax Vector containing for each feature the maximal value in the training set - * @param min Minimal value of range - * @param max Maximal value of range - * @tparam T Type of [[Vector]] - * @return Scaled feature vector - */ - private def scaleVector[T <: Vector: BreezeVectorConverter]( - vector: T, - broadcastMin: linalg.Vector[Double], - broadcastMax: linalg.Vector[Double], - min: Double, - max: Double) - : T = { - var myVector = vector.asBreeze - - //handle the case where a feature takes only one value - val rangePerFeature = (broadcastMax - broadcastMin) - for (i <- 0 until rangePerFeature.size) { - if (rangePerFeature(i) == 0.0) { - rangePerFeature(i)= 1.0 - } - } - - myVector -= broadcastMin - myVector :/= rangePerFeature - myVector = (myVector :* (max - min)) + min - myVector.fromBreeze - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala deleted file mode 100644 index f1c788e..0000000 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/PolynomialFeatures.scala +++ /dev/null @@ -1,209 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.preprocessing - -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala.{DataSet, _} -import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap} -import org.apache.flink.ml.math.{Vector, VectorBuilder} -import org.apache.flink.ml.pipeline.{FitOperation, TransformDataSetOperation, Transformer} -import org.apache.flink.ml.preprocessing.PolynomialFeatures.Degree - -import scala.reflect.ClassTag - -/** Maps a vector into the polynomial feature space. - * - * This transformer takes a a vector of values `(x, y, z, ...)` and maps it into the - * polynomial feature space of degree `d`. That is to say, it calculates the following - * representation: - * - * `(x, y, z, x^2, xy, y^2, yz, z^2, x^3, x^2y, x^2z, xyz, ...)^T` - * - * This transformer can be prepended to all [[org.apache.flink.ml.pipeline.Transformer]] and - * [[org.apache.flink.ml.pipeline.Predictor]] implementations which expect an input of - * [[LabeledVector]]. - * - * @example - * {{{ - * val trainingDS: DataSet[LabeledVector] = ... - * - * val polyFeatures = PolynomialFeatures() - * .setDegree(3) - * - * val mlr = MultipleLinearRegression() - * - * val pipeline = polyFeatures.chainPredictor(mlr) - * - * pipeline.fit(trainingDS) - * }}} - * - * =Parameters= - * - * - [[org.apache.flink.ml.preprocessing.PolynomialFeatures.Degree]]: Maximum polynomial degree - */ -class PolynomialFeatures extends Transformer[PolynomialFeatures] { - - def setDegree(degree: Int): PolynomialFeatures = { - parameters.add(Degree, degree) - this - } -} - -object PolynomialFeatures{ - - // ====================================== Parameters ============================================= - - case object Degree extends Parameter[Int] { - override val defaultValue: Option[Int] = Some(1) - } - - // =================================== Factory methods =========================================== - - def apply(): PolynomialFeatures = { - new PolynomialFeatures() - } - - // ====================================== Operations ============================================= - - /** The [[PolynomialFeatures]] transformer does not need a fitting phase. - * - * @tparam T The fitting works with arbitrary input types - * @return - */ - implicit def fitNoOp[T] = { - new FitOperation[PolynomialFeatures, T]{ - override def fit( - instance: PolynomialFeatures, - fitParameters: ParameterMap, - input: DataSet[T]) - : Unit = {} - } - } - - /** [[org.apache.flink.ml.pipeline.TransformDataSetOperation]] to map a [[Vector]] into the - * polynomial feature space. - * - * @tparam T Subclass of [[Vector]] - * @return - */ - implicit def transformVectorIntoPolynomialBase[ - T <: Vector : VectorBuilder: TypeInformation: ClassTag - ] = { - new TransformDataSetOperation[PolynomialFeatures, T, T] { - override def transformDataSet( - instance: PolynomialFeatures, - transformParameters: ParameterMap, - input: DataSet[T]) - : DataSet[T] = { - val resultingParameters = instance.parameters ++ transformParameters - - val degree = resultingParameters(Degree) - - input.map { - vector => { - calculatePolynomial(degree, vector) - } - } - } - } - } - - /** [[org.apache.flink.ml.pipeline.TransformDataSetOperation]] to map a [[LabeledVector]] into the - * polynomial feature space - */ - implicit val transformLabeledVectorIntoPolynomialBase = - new TransformDataSetOperation[PolynomialFeatures, LabeledVector, LabeledVector] { - - override def transformDataSet( - instance: PolynomialFeatures, - transformParameters: ParameterMap, - input: DataSet[LabeledVector]) - : DataSet[LabeledVector] = { - val resultingParameters = instance.parameters ++ transformParameters - - val degree = resultingParameters(Degree) - - input.map { - labeledVector => { - val vector = labeledVector.vector - val label = labeledVector.label - - val transformedVector = calculatePolynomial(degree, vector) - - LabeledVector(label, transformedVector) - } - } - } - } - - - private def calculatePolynomial[T <: Vector: VectorBuilder](degree: Int, vector: T): T = { - val builder = implicitly[VectorBuilder[T]] - builder.build(calculateCombinedCombinations(degree, vector)) - } - - /** Calculates for a given vector its representation in the polynomial feature space. - * - * @param degree Maximum degree of polynomial - * @param vector Values of the polynomial variables - * @return List of polynomial values - */ - private def calculateCombinedCombinations(degree: Int, vector: Vector): List[Double] = { - if(degree == 0) { - List() - } else { - val partialResult = calculateCombinedCombinations(degree - 1, vector) - - val combinations = calculateCombinations(vector.size, degree) - - val result = combinations map { - combination => - combination.zipWithIndex.map{ - case (exp, idx) => math.pow(vector(idx), exp) - }.fold(1.0)(_ * _) - } - - result ::: partialResult - } - - } - - /** Calculates all possible combinations of a polynom of degree `value`, whereas the polynom - * can consist of up to `length` factors. The return value is the list of the exponents of the - * individual factors - * - * @param length maximum number of factors - * @param value degree of polynomial - * @return List of lists which contain the exponents of the individual factors - */ - private def calculateCombinations(length: Int, value: Int): List[List[Int]] = { - if(length == 0) { - List() - } else if (length == 1) { - List(List(value)) - } else { - value to 0 by -1 flatMap { - v => - calculateCombinations(length - 1, value - v) map { - v::_ - } - } toList - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e9bf13d8/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala deleted file mode 100644 index 82e8abf..0000000 --- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala +++ /dev/null @@ -1,302 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.ml.preprocessing - -import breeze.linalg -import breeze.numerics.sqrt -import breeze.numerics.sqrt._ -import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.api.scala._ -import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap} -import org.apache.flink.ml.math.Breeze._ -import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} -import org.apache.flink.ml.pipeline.{TransformOperation, FitOperation, -Transformer} -import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} - -import scala.reflect.ClassTag - -/** Scales observations, so that all features have a user-specified mean and standard deviation. - * By default for [[StandardScaler]] transformer mean=0.0 and std=1.0. - * - * This transformer takes a subtype of [[Vector]] of values and maps it to a - * scaled subtype of [[Vector]] such that each feature has a user-specified mean and standard - * deviation. - * - * This transformer can be prepended to all [[Transformer]] and - * [[org.apache.flink.ml.pipeline.Predictor]] implementations which expect as input a subtype - * of [[Vector]]. - * - * @example - * {{{ - * val trainingDS: DataSet[Vector] = env.fromCollection(data) - * val transformer = StandardScaler().setMean(10.0).setStd(2.0) - * - * transformer.fit(trainingDS) - * val transformedDS = transformer.transform(trainingDS) - * }}} - * - * =Parameters= - * - * - [[Mean]]: The mean value of transformed data set; by default equal to 0 - * - [[Std]]: The standard deviation of the transformed data set; by default - * equal to 1 - */ -class StandardScaler extends Transformer[StandardScaler] { - - private[preprocessing] var metricsOption: Option[ - DataSet[(linalg.Vector[Double], linalg.Vector[Double])] - ] = None - - /** Sets the target mean of the transformed data - * - * @param mu the user-specified mean value. - * @return the StandardScaler instance with its mean value set to the user-specified value - */ - def setMean(mu: Double): StandardScaler = { - parameters.add(Mean, mu) - this - } - - /** Sets the target standard deviation of the transformed data - * - * @param std the user-specified std value. In case the user gives 0.0 value as input, - * the std is set to the default value: 1.0. - * @return the StandardScaler instance with its std value set to the user-specified value - */ - def setStd(std: Double): StandardScaler = { - if (std == 0.0) { - return this - } - parameters.add(Std, std) - this - } -} - -object StandardScaler { - - // ====================================== Parameters ============================================= - - case object Mean extends Parameter[Double] { - override val defaultValue: Option[Double] = Some(0.0) - } - - case object Std extends Parameter[Double] { - override val defaultValue: Option[Double] = Some(1.0) - } - - // ==================================== Factory methods ========================================== - - def apply(): StandardScaler = { - new StandardScaler() - } - - // ====================================== Operations ============================================= - - /** Trains the [[org.apache.flink.ml.preprocessing.StandardScaler]] by learning the mean and - * standard deviation of the training data. These values are used inthe transform step - * to transform the given input data. - * - * @tparam T Input data type which is a subtype of [[Vector]] - * @return - */ - implicit def fitVectorStandardScaler[T <: Vector] = new FitOperation[StandardScaler, T] { - override def fit(instance: StandardScaler, fitParameters: ParameterMap, input: DataSet[T]) - : Unit = { - val metrics = extractFeatureMetrics(input) - - instance.metricsOption = Some(metrics) - } - } - - /** Trains the [[StandardScaler]] by learning the mean and standard deviation of the training - * data which is of type [[LabeledVector]]. The mean and standard deviation are used to - * transform the given input data. - * - */ - implicit val fitLabeledVectorStandardScaler = { - new FitOperation[StandardScaler, LabeledVector] { - override def fit( - instance: StandardScaler, - fitParameters: ParameterMap, - input: DataSet[LabeledVector]) - : Unit = { - val vectorDS = input.map(_.vector) - val metrics = extractFeatureMetrics(vectorDS) - - instance.metricsOption = Some(metrics) - } - } - } - - /** Trains the [[StandardScaler]] by learning the mean and standard deviation of the training - * data which is of type ([[Vector]], Double). The mean and standard deviation are used to - * transform the given input data. - * - */ - implicit def fitLabelVectorTupleStandardScaler - [T <: Vector: BreezeVectorConverter: TypeInformation: ClassTag] = { - new FitOperation[StandardScaler, (T, Double)] { - override def fit( - instance: StandardScaler, - fitParameters: ParameterMap, - input: DataSet[(T, Double)]) - : Unit = { - val vectorDS = input.map(_._1) - val metrics = extractFeatureMetrics(vectorDS) - - instance.metricsOption = Some(metrics) - } - } - } - - /** Calculates in one pass over the data the features' mean and standard deviation. - * For the calculation of the Standard deviation with one pass over the data, - * the Youngs & Cramer algorithm was used: - * [[http://www.cs.yale.edu/publications/techreports/tr222.pdf]] - * - * - * @param dataSet The data set for which we want to calculate mean and variance - * @return DataSet containing a single tuple of two vectors (meanVector, stdVector). - * The first vector represents the mean vector and the second is the standard - * deviation vector. - */ - private def extractFeatureMetrics[T <: Vector](dataSet: DataSet[T]) - : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = { - val metrics = dataSet.map{ - v => (1.0, v.asBreeze, linalg.Vector.zeros[Double](v.size)) - }.reduce{ - (metrics1, metrics2) => { - /* We use formula 1.5b of the cited technical report for the combination of partial - * sum of squares. According to 1.5b: - * val temp1 : m/n(m+n) - * val temp2 : n/m - */ - val temp1 = metrics1._1 / (metrics2._1 * (metrics1._1 + metrics2._1)) - val temp2 = metrics2._1 / metrics1._1 - val tempVector = (metrics1._2 * temp2) - metrics2._2 - val tempS = (metrics1._3 + metrics2._3) + (tempVector :* tempVector) * temp1 - - (metrics1._1 + metrics2._1, metrics1._2 + metrics2._2, tempS) - } - }.map{ - metric => { - val varianceVector = sqrt(metric._3 / metric._1) - - for (i <- 0 until varianceVector.size) { - if (varianceVector(i) == 0.0) { - varianceVector.update(i, 1.0) - } - } - (metric._2 / metric._1, varianceVector) - } - } - metrics - } - - /** Base class for StandardScaler's [[TransformOperation]]. This class has to be extended for - * all types which are supported by [[StandardScaler]]'s transform operation. - * - * @tparam T - */ - abstract class StandardScalerTransformOperation[T: TypeInformation: ClassTag] - extends TransformOperation[ - StandardScaler, - (linalg.Vector[Double], linalg.Vector[Double]), - T, - T] { - - var mean: Double = _ - var std: Double = _ - - override def getModel( - instance: StandardScaler, - transformParameters: ParameterMap) - : DataSet[(linalg.Vector[Double], linalg.Vector[Double])] = { - mean = transformParameters(Mean) - std = transformParameters(Std) - - instance.metricsOption match { - case Some(metrics) => metrics - case None => - throw new RuntimeException("The StandardScaler has not been fitted to the data. " + - "This is necessary to estimate the mean and standard deviation of the data.") - } - } - - def scale[V <: Vector: BreezeVectorConverter]( - vector: V, - model: (linalg.Vector[Double], linalg.Vector[Double])) - : V = { - val (broadcastMean, broadcastStd) = model - var myVector = vector.asBreeze - myVector -= broadcastMean - myVector :/= broadcastStd - myVector = (myVector :* std) + mean - myVector.fromBreeze - } - } - - /** [[TransformOperation]] to transform [[Vector]] types - * - * @tparam T - * @return - */ - implicit def transformVectors[T <: Vector: BreezeVectorConverter: TypeInformation: ClassTag] = { - new StandardScalerTransformOperation[T]() { - override def transform( - vector: T, - model: (linalg.Vector[Double], linalg.Vector[Double])) - : T = { - scale(vector, model) - } - } - } - - /** [[TransformOperation]] to transform tuples of type ([[Vector]], [[Double]]). - * - * @tparam T - * @return - */ - implicit def transformTupleVectorDouble[ - T <: Vector: BreezeVectorConverter: TypeInformation: ClassTag] = { - new StandardScalerTransformOperation[(T, Double)] { - override def transform( - element: (T, Double), - model: (linalg.Vector[Double], linalg.Vector[Double])) - : (T, Double) = { - (scale(element._1, model), element._2) - } - } - } - - /** [[TransformOperation]] to transform [[LabeledVector]]. - * - */ - implicit val transformLabeledVector = new StandardScalerTransformOperation[LabeledVector] { - override def transform( - element: LabeledVector, - model: (linalg.Vector[Double], linalg.Vector[Double])) - : LabeledVector = { - val LabeledVector(label, vector) = element - - LabeledVector(label, scale(vector, model)) - } - } -}