[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30779810 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._ * At the moment, the whole partition is used for SGD, making it effectively a batch gradient * descent. Once a sampling operator has been introduced, the algorithm can be optimized * - * @param runParameters The parameters to tune the algorithm. Currently these include: - * [[Solver.LossFunction]] for the loss function to be used, - * [[Solver.RegularizationType]] for the type of regularization, - * [[Solver.RegularizationParameter]] for the regularization parameter, + * The parameters to tune the algorithm are: + * [[Solver.LossFunctionParameter]] for the loss function to be used, + * [[Solver.RegularizationTypeParameter]] for the type of regularization, + * [[Solver.RegularizationValueParameter]] for the regularization parameter, --- End diff -- Do we want to append the Parameter suffix to the parameter names? So far we haven't done that. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: From stream to h base
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/702#issuecomment-104170195 I like the idea of this, but we have to be aware that this breaks the existing streaming API by renaming the `writeToFile()` method. Since `writeToFile` is misleading anyways, I am in favor of this change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553820#comment-14553820 ] ASF GitHub Bot commented on FLINK-1992: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30780043 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ --- End diff -- One line comments should use `//` Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553830#comment-14553830 ] ASF GitHub Bot commented on FLINK-1992: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30780327 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ +def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): +DataSet[Double] = { + data.map { +new LossCalculation + }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) +.reduce { +(left, right) = + val (leftLoss, leftCount) = left + val (rightLoss, rightCount) = right + (leftLoss + rightLoss, rightCount + leftCount) + } +.map{new RegularizedLossCalculation} +.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) --- End diff -- Indentation seems to be off. Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30780327 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ +def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): +DataSet[Double] = { + data.map { +new LossCalculation + }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) +.reduce { +(left, right) = + val (leftLoss, leftCount) = left + val (rightLoss, rightCount) = right + (leftLoss + rightLoss, rightCount + leftCount) + } +.map{new RegularizedLossCalculation} +.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) --- End diff -- Indentation seems to be off. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2050) Add pipelining mechanism for chainable transformers and estimators
[ https://issues.apache.org/jira/browse/FLINK-2050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553840#comment-14553840 ] Theodore Vasiloudis commented on FLINK-2050: [~till.rohrmann] I'm going through the code now, could you take a look at the documentation when you get the chance? Add pipelining mechanism for chainable transformers and estimators -- Key: FLINK-2050 URL: https://issues.apache.org/jira/browse/FLINK-2050 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Till Rohrmann Labels: ML Fix For: 0.9 The key concept of an easy to use ML library is the quick and simple construction of data analysis pipelines. Scikit-learn's approach to define transformers and estimators seems to be a really good solution to this problem. I propose to follow a similar path, because it makes FlinkML flexible in terms of code reuse as well as easy for people coming from Scikit-learn to use the FlinkML. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30781135 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala --- @@ -40,6 +44,32 @@ abstract class Solver extends Serializable with WithParameters { data: DataSet[LabeledVector], initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] + /** Creates initial weights vector, creating a DataSet with a WeightVector element +* +* @param initialWeights An Option that may contain an initial set of weights +* @param data The data for which we optimize the weights +* @return A DataSet containing a single WeightVector element +*/ + def createInitialWeightsDS(initialWeights: Option[DataSet[WeightVector]], + data: DataSet[LabeledVector]): DataSet[WeightVector] = { +// TODO: Faster way to do this? +val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) + +initialWeights match { + // Ensure provided weight vector is a DenseVector + case Some(wvDS) = +wvDS.map { wv = { + val denseWeights = wv.weights match { +case dv: DenseVector = dv +case sv: SparseVector = sv.toDenseVector + } + WeightVector(denseWeights, wv.intercept) --- End diff -- Indentation one level off. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553851#comment-14553851 ] ASF GitHub Bot commented on FLINK-1992: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30781085 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala --- @@ -95,6 +95,7 @@ abstract class DiffRegularization extends Regularization { regParameter: Double) } +// TODO(tvas): I think NoRegularization should extend DiffRegularization --- End diff -- Why do you think? Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30781429 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala --- @@ -205,6 +190,61 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { } - // TODO: Need more corner cases + it should terminate early if the convergence criterion is reached in { +// TODO(tvas): We need a better way to check the convergence of the weights. +// Ideally we want to have a Breeze-like system, where the optimizers carry a history and that +// can tell us whether we have converged and at which iteration + +val env = ExecutionEnvironment.getExecutionEnvironment + +env.setParallelism(2) + +val sgdEarlyTerminate = GradientDescent() + .setConvergenceThreshold(1e2) + .setStepsize(1.0) + .setIterations(800) + .setLossFunction(SquaredLoss()) + .setRegularizationType(NoRegularization()) + .setRegularizationParameter(0.0) + +val inputDS = env.fromCollection(data) + +val weightDSEarlyTerminate = sgdEarlyTerminate.optimize(inputDS, None) + +val weightListEarly: Seq[WeightVector] = weightDSEarlyTerminate.collect() + +weightListEarly.size should equal(1) + +val weightVectorEarly: WeightVector = weightListEarly.head +val weightsEarly = weightVectorEarly.weights.asInstanceOf[DenseVector].data +val weight0Early = weightVectorEarly.intercept + +val sgdNoConvergence = GradientDescent() + .setStepsize(1.0) + .setIterations(800) + .setLossFunction(SquaredLoss()) + .setRegularizationType(NoRegularization()) + .setRegularizationParameter(0.0) + + --- End diff -- two line breaks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553863#comment-14553863 ] ASF GitHub Bot commented on FLINK-1992: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30781429 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala --- @@ -205,6 +190,61 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { } - // TODO: Need more corner cases + it should terminate early if the convergence criterion is reached in { +// TODO(tvas): We need a better way to check the convergence of the weights. +// Ideally we want to have a Breeze-like system, where the optimizers carry a history and that +// can tell us whether we have converged and at which iteration + +val env = ExecutionEnvironment.getExecutionEnvironment + +env.setParallelism(2) + +val sgdEarlyTerminate = GradientDescent() + .setConvergenceThreshold(1e2) + .setStepsize(1.0) + .setIterations(800) + .setLossFunction(SquaredLoss()) + .setRegularizationType(NoRegularization()) + .setRegularizationParameter(0.0) + +val inputDS = env.fromCollection(data) + +val weightDSEarlyTerminate = sgdEarlyTerminate.optimize(inputDS, None) + +val weightListEarly: Seq[WeightVector] = weightDSEarlyTerminate.collect() + +weightListEarly.size should equal(1) + +val weightVectorEarly: WeightVector = weightListEarly.head +val weightsEarly = weightVectorEarly.weights.asInstanceOf[DenseVector].data +val weight0Early = weightVectorEarly.intercept + +val sgdNoConvergence = GradientDescent() + .setStepsize(1.0) + .setIterations(800) + .setLossFunction(SquaredLoss()) + .setRegularizationType(NoRegularization()) + .setRegularizationParameter(0.0) + + --- End diff -- two line breaks. Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30781414 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala --- @@ -205,6 +190,61 @@ class GradientDescentITSuite extends FlatSpec with Matchers with FlinkTestBase { } - // TODO: Need more corner cases + it should terminate early if the convergence criterion is reached in { +// TODO(tvas): We need a better way to check the convergence of the weights. +// Ideally we want to have a Breeze-like system, where the optimizers carry a history and that +// can tell us whether we have converged and at which iteration + +val env = ExecutionEnvironment.getExecutionEnvironment + +env.setParallelism(2) + +val sgdEarlyTerminate = GradientDescent() + .setConvergenceThreshold(1e2) + .setStepsize(1.0) + .setIterations(800) + .setLossFunction(SquaredLoss()) + .setRegularizationType(NoRegularization()) + .setRegularizationParameter(0.0) + +val inputDS = env.fromCollection(data) + +val weightDSEarlyTerminate = sgdEarlyTerminate.optimize(inputDS, None) + +val weightListEarly: Seq[WeightVector] = weightDSEarlyTerminate.collect() + +weightListEarly.size should equal(1) + +val weightVectorEarly: WeightVector = weightListEarly.head +val weightsEarly = weightVectorEarly.weights.asInstanceOf[DenseVector].data +val weight0Early = weightVectorEarly.intercept + +val sgdNoConvergence = GradientDescent() + .setStepsize(1.0) --- End diff -- If this solves the problem, then go for it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/692#issuecomment-104179672 Great work @thvasilo. I had some minor comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553874#comment-14553874 ] ASF GitHub Bot commented on FLINK-1992: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/692#issuecomment-104179672 Great work @thvasilo. I had some minor comments. Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [Flink-1985] Streaming does not correctly forw...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/682#discussion_r30782105 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java --- @@ -93,6 +96,15 @@ public JobGraph createJobGraph(String jobName) { configureCheckpointing(); + try { + InstantiationUtil.writeObjectToConfig( + this.streamGraph.getExecutionConfig(), + this.jobGraph.getJobConfiguration(), + ExecutionConfig.CONFIG_KEY); + } catch (IOException e) { + throw new RuntimeException(Config object could not be written to Job Configuration: + e); --- End diff -- Can you replace the + by a , ? Throwable.toString() does not really give useful output. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2003) Building on some encrypted filesystems leads to File name too long error
[ https://issues.apache.org/jira/browse/FLINK-2003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553880#comment-14553880 ] ASF GitHub Bot commented on FLINK-2003: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/690#issuecomment-104181351 I would actually say into both, the README and the how to contribute section. Building on some encrypted filesystems leads to File name too long error -- Key: FLINK-2003 URL: https://issues.apache.org/jira/browse/FLINK-2003 Project: Flink Issue Type: Bug Components: Build System Reporter: Theodore Vasiloudis Priority: Minor Labels: build, starter The classnames generated from the build system can be too long. Creating too long filenames in some encrypted filesystems is not possible, including encfs which is what Ubuntu uses. This the same as this [Spark issue|https://issues.apache.org/jira/browse/SPARK-4820] The workaround (taken from the linked issue) is to add in Maven under the compile options: {code} + arg-Xmax-classfile-name/arg + arg128/arg {code} And in SBT add: {code} +scalacOptions in Compile ++= Seq(-Xmax-classfile-name, 128), {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2003] Building on some encrypted filesy...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/690#issuecomment-104181351 I would actually say into both, the README and the how to contribute section. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2050) Add pipelining mechanism for chainable transformers and estimators
[ https://issues.apache.org/jira/browse/FLINK-2050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553896#comment-14553896 ] ASF GitHub Bot commented on FLINK-2050: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/704#discussion_r30783065 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -22,38 +22,47 @@ import breeze.linalg import breeze.numerics.sqrt import breeze.numerics.sqrt._ import org.apache.flink.api.common.functions._ +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration -import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap} import org.apache.flink.ml.math.Breeze._ -import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} +import org.apache.flink.ml.pipeline.{TransformOperation, FitOperation, Transformer} import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import scala.reflect.ClassTag + /** Scales observations, so that all features have a user-specified mean and standard deviation. * By default for [[StandardScaler]] transformer mean=0.0 and std=1.0. * - * This transformer takes a [[Vector]] of values and maps it to a - * scaled [[Vector]] such that each feature has a user-specified mean and standard deviation. + * This transformer takes a subtype of [[Vector]] of values and maps it to a + * scaled subtype of [[Vector]] such that each feature has a user-specified mean and standard + * deviation. * * This transformer can be prepended to all [[Transformer]] and - * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of - * [[Vector]]. + * [[org.apache.flink.ml.pipeline.Predictor]] implementations which expect as input a subtype + * of [[Vector]]. * * @example * {{{ *val trainingDS: DataSet[Vector] = env.fromCollection(data) *val transformer = StandardScaler().setMean(10.0).setStd(2.0) * - *transformer.transform(trainingDS) + *transformer.fit(trainingDS) + *val transformedDS = transformer.transform(trainingDS) * }}} * * =Parameters= * - * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 - * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * - [[Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[Std]]: The standard deviation of the transformed data set; by default * equal to 1 --- End diff -- Why use just the top-level type here, but the fully qualified one in the ALS docstring? Add pipelining mechanism for chainable transformers and estimators -- Key: FLINK-2050 URL: https://issues.apache.org/jira/browse/FLINK-2050 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Till Rohrmann Labels: ML Fix For: 0.9 The key concept of an easy to use ML library is the quick and simple construction of data analysis pipelines. Scikit-learn's approach to define transformers and estimators seems to be a really good solution to this problem. I propose to follow a similar path, because it makes FlinkML flexible in terms of code reuse as well as easy for people coming from Scikit-learn to use the FlinkML. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2050] Introduces new pipelining mechani...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/704#discussion_r30783065 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -22,38 +22,47 @@ import breeze.linalg import breeze.numerics.sqrt import breeze.numerics.sqrt._ import org.apache.flink.api.common.functions._ +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration -import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap} import org.apache.flink.ml.math.Breeze._ -import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} +import org.apache.flink.ml.pipeline.{TransformOperation, FitOperation, Transformer} import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import scala.reflect.ClassTag + /** Scales observations, so that all features have a user-specified mean and standard deviation. * By default for [[StandardScaler]] transformer mean=0.0 and std=1.0. * - * This transformer takes a [[Vector]] of values and maps it to a - * scaled [[Vector]] such that each feature has a user-specified mean and standard deviation. + * This transformer takes a subtype of [[Vector]] of values and maps it to a + * scaled subtype of [[Vector]] such that each feature has a user-specified mean and standard + * deviation. * * This transformer can be prepended to all [[Transformer]] and - * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of - * [[Vector]]. + * [[org.apache.flink.ml.pipeline.Predictor]] implementations which expect as input a subtype + * of [[Vector]]. * * @example * {{{ *val trainingDS: DataSet[Vector] = env.fromCollection(data) *val transformer = StandardScaler().setMean(10.0).setStd(2.0) * - *transformer.transform(trainingDS) + *transformer.fit(trainingDS) + *val transformedDS = transformer.transform(trainingDS) * }}} * * =Parameters= * - * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 - * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * - [[Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[Std]]: The standard deviation of the transformed data set; by default * equal to 1 --- End diff -- Why use just the top-level type here, but the fully qualified one in the ALS docstring? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs
[ https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553894#comment-14553894 ] ASF GitHub Bot commented on FLINK-2034: --- Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30783024 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink --- End diff -- maybe we should add a headline that makes clear that this is a roadmap Add vision and roadmap for ML library to docs - Key: FLINK-2034 URL: https://issues.apache.org/jira/browse/FLINK-2034 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Theodore Vasiloudis Assignee: Theodore Vasiloudis Labels: ML Fix For: 0.9 We should have a document describing the vision of the Machine Learning library in Flink and an up to date roadmap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30783024 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink --- End diff -- maybe we should add a headline that makes clear that this is a roadmap --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r3078 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? --- End diff -- the `apply` method unpacks the option value, whereas the `get` method returns an `Option`. Maybe we should rename it into `getOption`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553817#comment-14553817 ] ASF GitHub Bot commented on FLINK-1992: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r3078 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? --- End diff -- the `apply` method unpacks the option value, whereas the `get` method returns an `Option`. Maybe we should rename it into `getOption`. Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30780043 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ --- End diff -- One line comments should use `//` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: From stream to h base
Github user fhueske commented on the pull request: https://github.com/apache/flink/pull/702#issuecomment-104171609 How about adding a `writeToFile()` method that accepts a `FileInputFormat`? This would only break code where the method is used with a non-FileInputFormat. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30780392 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ +def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): +DataSet[Double] = { + data.map { +new LossCalculation + }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) +.reduce { +(left, right) = + val (leftLoss, leftCount) = left + val (rightLoss, rightCount) = right + (leftLoss + rightLoss, rightCount + leftCount) + } +.map{new RegularizedLossCalculation} +.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) } - } - case None = createInitialWeightVector(dimensionsDS) -} - -// Perform the iterations -// TODO: Enable convergence stopping criterion, as in Multiple Linear regression -initialWeightsDS.iterate(numberOfIterations) { - weightVector = { -SGDStep(data, weightVector) - } +// We have to calculate for each weight vector the sum of squared residuals, +// and then sum them and apply regularization +val initialLossSumDS = lossCalculation(data, initialWeightsDS) + +// Combine weight vector with the current loss +val initialWeightsWithLossSum = initialWeightsDS. + crossWithTiny(initialLossSumDS).setParallelism(1) + +val resultWithLoss = initialWeightsWithLossSum. + iterateWithTermination(numberOfIterations) { + weightsWithLossSum = + +// Extract weight vector and loss +val previousWeightsDS = weightsWithLossSum.map{_._1} +val previousLossSumDS = weightsWithLossSum.map{_._2} + +val currentWeightsDS = SGDStep(data, previousWeightsDS) + +val currentLossSumDS = lossCalculation(data, currentWeightsDS) + +// Check if the relative change in the loss is smaller than the +// convergence threshold. If yes, then terminate i.e. return empty termination data set +val termination = previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1). + filter{ + pair = { +val (previousLoss, currentLoss) = pair + +if (previousLoss = 0) { + false +}
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30780559 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ +def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): +DataSet[Double] = { + data.map { +new LossCalculation + }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) +.reduce { +(left, right) = + val (leftLoss, leftCount) = left + val (rightLoss, rightCount) = right + (leftLoss + rightLoss, rightCount + leftCount) + } +.map{new RegularizedLossCalculation} +.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) } - } - case None = createInitialWeightVector(dimensionsDS) -} - -// Perform the iterations -// TODO: Enable convergence stopping criterion, as in Multiple Linear regression -initialWeightsDS.iterate(numberOfIterations) { - weightVector = { -SGDStep(data, weightVector) - } +// We have to calculate for each weight vector the sum of squared residuals, +// and then sum them and apply regularization +val initialLossSumDS = lossCalculation(data, initialWeightsDS) + +// Combine weight vector with the current loss +val initialWeightsWithLossSum = initialWeightsDS. + crossWithTiny(initialLossSumDS).setParallelism(1) + +val resultWithLoss = initialWeightsWithLossSum. + iterateWithTermination(numberOfIterations) { + weightsWithLossSum = + +// Extract weight vector and loss +val previousWeightsDS = weightsWithLossSum.map{_._1} +val previousLossSumDS = weightsWithLossSum.map{_._2} + +val currentWeightsDS = SGDStep(data, previousWeightsDS) + +val currentLossSumDS = lossCalculation(data, currentWeightsDS) + +// Check if the relative change in the loss is smaller than the +// convergence threshold. If yes, then terminate i.e. return empty termination data set +val termination = previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1). + filter{ + pair = { +val (previousLoss, currentLoss) = pair + +if (previousLoss = 0) { + false +}
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553835#comment-14553835 ] ASF GitHub Bot commented on FLINK-1992: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30780614 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ +def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): +DataSet[Double] = { + data.map { +new LossCalculation + }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) +.reduce { +(left, right) = + val (leftLoss, leftCount) = left + val (rightLoss, rightCount) = right + (leftLoss + rightLoss, rightCount + leftCount) + } +.map{new RegularizedLossCalculation} +.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) } - } - case None = createInitialWeightVector(dimensionsDS) -} - -// Perform the iterations -// TODO: Enable convergence stopping criterion, as in Multiple Linear regression -initialWeightsDS.iterate(numberOfIterations) { - weightVector = { -SGDStep(data, weightVector) - } +// We have to calculate for each weight vector the sum of squared residuals, +// and then sum them and apply regularization +val initialLossSumDS = lossCalculation(data, initialWeightsDS) + +// Combine weight vector with the current loss +val initialWeightsWithLossSum = initialWeightsDS. + crossWithTiny(initialLossSumDS).setParallelism(1) + +val resultWithLoss = initialWeightsWithLossSum. + iterateWithTermination(numberOfIterations) { + weightsWithLossSum = + +// Extract weight vector and loss +val previousWeightsDS = weightsWithLossSum.map{_._1} +val previousLossSumDS = weightsWithLossSum.map{_._2} + +val currentWeightsDS = SGDStep(data, previousWeightsDS) + +val currentLossSumDS = lossCalculation(data, currentWeightsDS) + +// Check if the relative change in the loss is smaller than the +// convergence threshold. If yes, then terminate i.e. return empty termination data set +val termination =
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30780625 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ +def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): +DataSet[Double] = { + data.map { +new LossCalculation + }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) +.reduce { +(left, right) = + val (leftLoss, leftCount) = left + val (rightLoss, rightCount) = right + (leftLoss + rightLoss, rightCount + leftCount) + } +.map{new RegularizedLossCalculation} +.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) } - } - case None = createInitialWeightVector(dimensionsDS) -} - -// Perform the iterations -// TODO: Enable convergence stopping criterion, as in Multiple Linear regression -initialWeightsDS.iterate(numberOfIterations) { - weightVector = { -SGDStep(data, weightVector) - } +// We have to calculate for each weight vector the sum of squared residuals, +// and then sum them and apply regularization +val initialLossSumDS = lossCalculation(data, initialWeightsDS) + +// Combine weight vector with the current loss +val initialWeightsWithLossSum = initialWeightsDS. + crossWithTiny(initialLossSumDS).setParallelism(1) + +val resultWithLoss = initialWeightsWithLossSum. + iterateWithTermination(numberOfIterations) { + weightsWithLossSum = + +// Extract weight vector and loss +val previousWeightsDS = weightsWithLossSum.map{_._1} +val previousLossSumDS = weightsWithLossSum.map{_._2} + +val currentWeightsDS = SGDStep(data, previousWeightsDS) + +val currentLossSumDS = lossCalculation(data, currentWeightsDS) + +// Check if the relative change in the loss is smaller than the +// convergence threshold. If yes, then terminate i.e. return empty termination data set +val termination = previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1). + filter{ + pair = { +val (previousLoss, currentLoss) = pair + +if (previousLoss = 0) { + false +}
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30780614 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ +def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): +DataSet[Double] = { + data.map { +new LossCalculation + }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) +.reduce { +(left, right) = + val (leftLoss, leftCount) = left + val (rightLoss, rightCount) = right + (leftLoss + rightLoss, rightCount + leftCount) + } +.map{new RegularizedLossCalculation} +.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) } - } - case None = createInitialWeightVector(dimensionsDS) -} - -// Perform the iterations -// TODO: Enable convergence stopping criterion, as in Multiple Linear regression -initialWeightsDS.iterate(numberOfIterations) { - weightVector = { -SGDStep(data, weightVector) - } +// We have to calculate for each weight vector the sum of squared residuals, +// and then sum them and apply regularization +val initialLossSumDS = lossCalculation(data, initialWeightsDS) + +// Combine weight vector with the current loss +val initialWeightsWithLossSum = initialWeightsDS. + crossWithTiny(initialLossSumDS).setParallelism(1) + +val resultWithLoss = initialWeightsWithLossSum. + iterateWithTermination(numberOfIterations) { + weightsWithLossSum = + +// Extract weight vector and loss +val previousWeightsDS = weightsWithLossSum.map{_._1} +val previousLossSumDS = weightsWithLossSum.map{_._2} + +val currentWeightsDS = SGDStep(data, previousWeightsDS) + +val currentLossSumDS = lossCalculation(data, currentWeightsDS) + +// Check if the relative change in the loss is smaller than the +// convergence threshold. If yes, then terminate i.e. return empty termination data set +val termination = previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1). + filter{ + pair = { +val (previousLoss, currentLoss) = pair + +if (previousLoss = 0) { + false +}
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553834#comment-14553834 ] ASF GitHub Bot commented on FLINK-1992: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30780559 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ +def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): +DataSet[Double] = { + data.map { +new LossCalculation + }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) +.reduce { +(left, right) = + val (leftLoss, leftCount) = left + val (rightLoss, rightCount) = right + (leftLoss + rightLoss, rightCount + leftCount) + } +.map{new RegularizedLossCalculation} +.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) } - } - case None = createInitialWeightVector(dimensionsDS) -} - -// Perform the iterations -// TODO: Enable convergence stopping criterion, as in Multiple Linear regression -initialWeightsDS.iterate(numberOfIterations) { - weightVector = { -SGDStep(data, weightVector) - } +// We have to calculate for each weight vector the sum of squared residuals, +// and then sum them and apply regularization +val initialLossSumDS = lossCalculation(data, initialWeightsDS) + +// Combine weight vector with the current loss +val initialWeightsWithLossSum = initialWeightsDS. + crossWithTiny(initialLossSumDS).setParallelism(1) + +val resultWithLoss = initialWeightsWithLossSum. + iterateWithTermination(numberOfIterations) { + weightsWithLossSum = + +// Extract weight vector and loss +val previousWeightsDS = weightsWithLossSum.map{_._1} +val previousLossSumDS = weightsWithLossSum.map{_._2} + +val currentWeightsDS = SGDStep(data, previousWeightsDS) + +val currentLossSumDS = lossCalculation(data, currentWeightsDS) + +// Check if the relative change in the loss is smaller than the +// convergence threshold. If yes, then terminate i.e. return empty termination data set +val termination =
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30781162 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala --- @@ -106,7 +139,7 @@ object Solver { * See [[https://en.wikipedia.org/wiki/Iterative_method Iterative Methods on Wikipedia]] for more * info */ -abstract class IterativeSolver extends Solver { +abstract class IterativeSolver() extends Solver() { --- End diff -- Do we need the empty parenthesis? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30781214 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala --- @@ -118,6 +151,44 @@ abstract class IterativeSolver extends Solver { parameters.add(Stepsize, stepsize) this } + + def setConvergenceThreshold(convergenceThreshold: Double): IterativeSolver = { +parameters.add(ConvergenceThreshold, convergenceThreshold) +this + } + + /** Mapping function that calculates the weight gradients from the data. +* +*/ + protected class GradientCalculation extends --- End diff -- extends in next line with 4 spaces indentation if line break is necessary here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553854#comment-14553854 ] ASF GitHub Bot commented on FLINK-1992: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30781162 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala --- @@ -106,7 +139,7 @@ object Solver { * See [[https://en.wikipedia.org/wiki/Iterative_method Iterative Methods on Wikipedia]] for more * info */ -abstract class IterativeSolver extends Solver { +abstract class IterativeSolver() extends Solver() { --- End diff -- Do we need the empty parenthesis? Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Some fixes for Scala type analysis
Github user vasia commented on the pull request: https://github.com/apache/flink/pull/669#issuecomment-104174485 Hey @aljoscha, it seems this is one needed for the Gelly Scala API :-) Shall I go ahead and merge it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-1985] Streaming does not correctly forw...
Github user rmetzger commented on a diff in the pull request: https://github.com/apache/flink/pull/682#discussion_r30782073 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java --- @@ -93,6 +96,15 @@ public JobGraph createJobGraph(String jobName) { configureCheckpointing(); + try { + InstantiationUtil.writeObjectToConfig( + this.streamGraph.getExecutionConfig(), + this.jobGraph.getJobConfiguration(), + ExecutionConfig.CONFIG_KEY); + } catch (IOException e) { + throw new RuntimeException(Config object could not be written to Job Configuration: + e); --- End diff -- The indentation is off here ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-1985] Streaming does not correctly forw...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/682#issuecomment-104180508 I have some minor remarks which need some attention, otherwise good to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Some fixes for Scala type analysis
Github user aljoscha commented on the pull request: https://github.com/apache/flink/pull/669#issuecomment-104182293 I didn't want to merge it without any comments. But please, go ahead. :smile: --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553811#comment-14553811 ] ASF GitHub Bot commented on FLINK-1992: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30779835 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._ * At the moment, the whole partition is used for SGD, making it effectively a batch gradient * descent. Once a sampling operator has been introduced, the algorithm can be optimized * - * @param runParameters The parameters to tune the algorithm. Currently these include: - * [[Solver.LossFunction]] for the loss function to be used, - * [[Solver.RegularizationType]] for the type of regularization, - * [[Solver.RegularizationParameter]] for the regularization parameter, + * The parameters to tune the algorithm are: + * [[Solver.LossFunctionParameter]] for the loss function to be used, + * [[Solver.RegularizationTypeParameter]] for the type of regularization, + * [[Solver.RegularizationValueParameter]] for the regularization parameter, * [[IterativeSolver.Iterations]] for the maximum number of iteration, * [[IterativeSolver.Stepsize]] for the learning rate used. + * [[IterativeSolver.ConvergenceThreshold]] when provided the algorithm will + * stop the iterations if the change in the value of the objective + * function between successive iterations is is smaller than this value. */ -class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { +class GradientDescent() extends IterativeSolver() { --- End diff -- Do we need the parenthesis? Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553808#comment-14553808 ] ASF GitHub Bot commented on FLINK-1992: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30779810 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._ * At the moment, the whole partition is used for SGD, making it effectively a batch gradient * descent. Once a sampling operator has been introduced, the algorithm can be optimized * - * @param runParameters The parameters to tune the algorithm. Currently these include: - * [[Solver.LossFunction]] for the loss function to be used, - * [[Solver.RegularizationType]] for the type of regularization, - * [[Solver.RegularizationParameter]] for the regularization parameter, + * The parameters to tune the algorithm are: + * [[Solver.LossFunctionParameter]] for the loss function to be used, + * [[Solver.RegularizationTypeParameter]] for the type of regularization, + * [[Solver.RegularizationValueParameter]] for the regularization parameter, --- End diff -- Do we want to append the Parameter suffix to the parameter names? So far we haven't done that. Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: From stream to h base
Github user HilmiYildirim commented on the pull request: https://github.com/apache/flink/pull/702#issuecomment-104171850 Good idea. I am working on it. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: From stream to h base
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/702#issuecomment-104171814 That is a very nice idea! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553832#comment-14553832 ] ASF GitHub Bot commented on FLINK-1992: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30780392 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ +def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): +DataSet[Double] = { + data.map { +new LossCalculation + }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) +.reduce { +(left, right) = + val (leftLoss, leftCount) = left + val (rightLoss, rightCount) = right + (leftLoss + rightLoss, rightCount + leftCount) + } +.map{new RegularizedLossCalculation} +.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) } - } - case None = createInitialWeightVector(dimensionsDS) -} - -// Perform the iterations -// TODO: Enable convergence stopping criterion, as in Multiple Linear regression -initialWeightsDS.iterate(numberOfIterations) { - weightVector = { -SGDStep(data, weightVector) - } +// We have to calculate for each weight vector the sum of squared residuals, +// and then sum them and apply regularization +val initialLossSumDS = lossCalculation(data, initialWeightsDS) + +// Combine weight vector with the current loss +val initialWeightsWithLossSum = initialWeightsDS. + crossWithTiny(initialLossSumDS).setParallelism(1) + +val resultWithLoss = initialWeightsWithLossSum. + iterateWithTermination(numberOfIterations) { + weightsWithLossSum = + +// Extract weight vector and loss +val previousWeightsDS = weightsWithLossSum.map{_._1} +val previousLossSumDS = weightsWithLossSum.map{_._2} + +val currentWeightsDS = SGDStep(data, previousWeightsDS) + +val currentLossSumDS = lossCalculation(data, currentWeightsDS) + +// Check if the relative change in the loss is smaller than the +// convergence threshold. If yes, then terminate i.e. return empty termination data set +val termination =
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30781085 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala --- @@ -95,6 +95,7 @@ abstract class DiffRegularization extends Regularization { regParameter: Double) } +// TODO(tvas): I think NoRegularization should extend DiffRegularization --- End diff -- Why do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553855#comment-14553855 ] ASF GitHub Bot commented on FLINK-1992: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30781214 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala --- @@ -118,6 +151,44 @@ abstract class IterativeSolver extends Solver { parameters.add(Stepsize, stepsize) this } + + def setConvergenceThreshold(convergenceThreshold: Double): IterativeSolver = { +parameters.add(ConvergenceThreshold, convergenceThreshold) +this + } + + /** Mapping function that calculates the weight gradients from the data. +* +*/ + protected class GradientCalculation extends --- End diff -- extends in next line with 4 spaces indentation if line break is necessary here. Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2050) Add pipelining mechanism for chainable transformers and estimators
[ https://issues.apache.org/jira/browse/FLINK-2050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553879#comment-14553879 ] Till Rohrmann commented on FLINK-2050: -- Yes of course, where do I find it? On Thu, May 21, 2015 at 10:11 AM, Theodore Vasiloudis (JIRA) Add pipelining mechanism for chainable transformers and estimators -- Key: FLINK-2050 URL: https://issues.apache.org/jira/browse/FLINK-2050 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Till Rohrmann Labels: ML Fix For: 0.9 The key concept of an easy to use ML library is the quick and simple construction of data analysis pipelines. Scikit-learn's approach to define transformers and estimators seems to be a really good solution to this problem. I propose to follow a similar path, because it makes FlinkML flexible in terms of code reuse as well as easy for people coming from Scikit-learn to use the FlinkML. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/688#issuecomment-104185370 Thank you for writing this. Finally I get a better understanding of the overall status of the FlinkML stuff ;) +1 to merge. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs
[ https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553907#comment-14553907 ] ASF GitHub Bot commented on FLINK-2034: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/688#issuecomment-104185370 Thank you for writing this. Finally I get a better understanding of the overall status of the FlinkML stuff ;) +1 to merge. Add vision and roadmap for ML library to docs - Key: FLINK-2034 URL: https://issues.apache.org/jira/browse/FLINK-2034 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Theodore Vasiloudis Assignee: Theodore Vasiloudis Labels: ML Fix For: 0.9 We should have a document describing the vision of the Machine Learning library in Flink and an up to date roadmap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2050) Add pipelining mechanism for chainable transformers and estimators
[ https://issues.apache.org/jira/browse/FLINK-2050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553909#comment-14553909 ] ASF GitHub Bot commented on FLINK-2050: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/704#discussion_r30783847 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -22,38 +22,47 @@ import breeze.linalg import breeze.numerics.sqrt import breeze.numerics.sqrt._ import org.apache.flink.api.common.functions._ +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration -import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap} import org.apache.flink.ml.math.Breeze._ -import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} +import org.apache.flink.ml.pipeline.{TransformOperation, FitOperation, Transformer} import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import scala.reflect.ClassTag + /** Scales observations, so that all features have a user-specified mean and standard deviation. * By default for [[StandardScaler]] transformer mean=0.0 and std=1.0. * - * This transformer takes a [[Vector]] of values and maps it to a - * scaled [[Vector]] such that each feature has a user-specified mean and standard deviation. + * This transformer takes a subtype of [[Vector]] of values and maps it to a + * scaled subtype of [[Vector]] such that each feature has a user-specified mean and standard + * deviation. * * This transformer can be prepended to all [[Transformer]] and - * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of - * [[Vector]]. + * [[org.apache.flink.ml.pipeline.Predictor]] implementations which expect as input a subtype + * of [[Vector]]. * * @example * {{{ *val trainingDS: DataSet[Vector] = env.fromCollection(data) *val transformer = StandardScaler().setMean(10.0).setStd(2.0) * - *transformer.transform(trainingDS) + *transformer.fit(trainingDS) + *val transformedDS = transformer.transform(trainingDS) * }}} * * =Parameters= * - * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 - * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * - [[Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[Std]]: The standard deviation of the transformed data set; by default * equal to 1 --- End diff -- IntelliJ did it that way. I haven't figured out yet whether the full path is necessary or not. Add pipelining mechanism for chainable transformers and estimators -- Key: FLINK-2050 URL: https://issues.apache.org/jira/browse/FLINK-2050 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Till Rohrmann Labels: ML Fix For: 0.9 The key concept of an easy to use ML library is the quick and simple construction of data analysis pipelines. Scikit-learn's approach to define transformers and estimators seems to be a really good solution to this problem. I propose to follow a similar path, because it makes FlinkML flexible in terms of code reuse as well as easy for people coming from Scikit-learn to use the FlinkML. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30783908 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ --- End diff -- I'm following the [Scala style guide](http://docs.scala-lang.org/style/scaladoc.html#general_style) here, should I change this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553911#comment-14553911 ] ASF GitHub Bot commented on FLINK-1992: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30783908 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ --- End diff -- I'm following the [Scala style guide](http://docs.scala-lang.org/style/scaladoc.html#general_style) here, should I change this? Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2050] Introduces new pipelining mechani...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/704#discussion_r30783847 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala --- @@ -22,38 +22,47 @@ import breeze.linalg import breeze.numerics.sqrt import breeze.numerics.sqrt._ import org.apache.flink.api.common.functions._ +import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.scala._ import org.apache.flink.configuration.Configuration -import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer} +import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap} import org.apache.flink.ml.math.Breeze._ -import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.math.{BreezeVectorConverter, Vector} +import org.apache.flink.ml.pipeline.{TransformOperation, FitOperation, Transformer} import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std} +import scala.reflect.ClassTag + /** Scales observations, so that all features have a user-specified mean and standard deviation. * By default for [[StandardScaler]] transformer mean=0.0 and std=1.0. * - * This transformer takes a [[Vector]] of values and maps it to a - * scaled [[Vector]] such that each feature has a user-specified mean and standard deviation. + * This transformer takes a subtype of [[Vector]] of values and maps it to a + * scaled subtype of [[Vector]] such that each feature has a user-specified mean and standard + * deviation. * * This transformer can be prepended to all [[Transformer]] and - * [[org.apache.flink.ml.common.Learner]] implementations which expect an input of - * [[Vector]]. + * [[org.apache.flink.ml.pipeline.Predictor]] implementations which expect as input a subtype + * of [[Vector]]. * * @example * {{{ *val trainingDS: DataSet[Vector] = env.fromCollection(data) *val transformer = StandardScaler().setMean(10.0).setStd(2.0) * - *transformer.transform(trainingDS) + *transformer.fit(trainingDS) + *val transformedDS = transformer.transform(trainingDS) * }}} * * =Parameters= * - * - [[StandardScaler.Mean]]: The mean value of transformed data set; by default equal to 0 - * - [[StandardScaler.Std]]: The standard deviation of the transformed data set; by default + * - [[Mean]]: The mean value of transformed data set; by default equal to 0 + * - [[Std]]: The standard deviation of the transformed data set; by default * equal to 1 --- End diff -- IntelliJ did it that way. I haven't figured out yet whether the full path is necessary or not. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2050] Introduces new pipelining mechani...
Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/704#issuecomment-104186229 Documentation :white_check_mark: Scaladocs :white_check_mark: Tests :white_check_mark: No code outside of flink-ml touched :white_check_mark: :ship: it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30784142 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._ * At the moment, the whole partition is used for SGD, making it effectively a batch gradient * descent. Once a sampling operator has been introduced, the algorithm can be optimized * - * @param runParameters The parameters to tune the algorithm. Currently these include: - * [[Solver.LossFunction]] for the loss function to be used, - * [[Solver.RegularizationType]] for the type of regularization, - * [[Solver.RegularizationParameter]] for the regularization parameter, + * The parameters to tune the algorithm are: + * [[Solver.LossFunctionParameter]] for the loss function to be used, + * [[Solver.RegularizationTypeParameter]] for the type of regularization, + * [[Solver.RegularizationValueParameter]] for the regularization parameter, --- End diff -- I'm doing this to avoid confusion between the LossFunction class and the parameter setting here. Having the same name I think creates some confusion, especially when auto-completing stuff in Intellij. However the names are starting to get pretty long. You think we should remove this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30784369 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. + +Contrary to other data-processing systems, Flink exploits in-memory data streaming, and natively +executes iterative processing algorithms which are common in ML. We plan to exploit the streaming +nature of Flink, and provide functionality designed specifically for data streams. + +FlinkML will allow data scientists to test their models locally and using subsets of data, and then +use the same code to run their algorithms at a much larger scale in a cluster setting. + +We are inspired by other open source efforts to provide ML systems, in particular +[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML pipelines, and Sparkâs +[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that scale with problem and +cluster sizes. --- End diff -- Do you think, we need this paragraph here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553945#comment-14553945 ] ASF GitHub Bot commented on FLINK-1992: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30784817 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ +def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): +DataSet[Double] = { + data.map { +new LossCalculation + }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) +.reduce { +(left, right) = + val (leftLoss, leftCount) = left + val (rightLoss, rightCount) = right + (leftLoss + rightLoss, rightCount + leftCount) + } +.map{new RegularizedLossCalculation} +.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) } - } - case None = createInitialWeightVector(dimensionsDS) -} - -// Perform the iterations -// TODO: Enable convergence stopping criterion, as in Multiple Linear regression -initialWeightsDS.iterate(numberOfIterations) { - weightVector = { -SGDStep(data, weightVector) - } +// We have to calculate for each weight vector the sum of squared residuals, +// and then sum them and apply regularization +val initialLossSumDS = lossCalculation(data, initialWeightsDS) + +// Combine weight vector with the current loss +val initialWeightsWithLossSum = initialWeightsDS. + crossWithTiny(initialLossSumDS).setParallelism(1) + +val resultWithLoss = initialWeightsWithLossSum. + iterateWithTermination(numberOfIterations) { + weightsWithLossSum = + +// Extract weight vector and loss +val previousWeightsDS = weightsWithLossSum.map{_._1} +val previousLossSumDS = weightsWithLossSum.map{_._2} + +val currentWeightsDS = SGDStep(data, previousWeightsDS) + +val currentLossSumDS = lossCalculation(data, currentWeightsDS) + +// Check if the relative change in the loss is smaller than the +// convergence threshold. If yes, then terminate i.e. return empty termination data set +val termination =
[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs
[ https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553951#comment-14553951 ] ASF GitHub Bot commented on FLINK-2034: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30785023 --- Diff: docs/libs/ml/index.md --- @@ -30,7 +122,7 @@ under the License. /dependency {% endhighlight %} -## Algorithms +## Algorithm Documentation --- End diff -- I think, we should move the algorithms a little bit more prominently to the top of the page. Add vision and roadmap for ML library to docs - Key: FLINK-2034 URL: https://issues.apache.org/jira/browse/FLINK-2034 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Theodore Vasiloudis Assignee: Theodore Vasiloudis Labels: ML Fix For: 0.9 We should have a document describing the vision of the Machine Learning library in Flink and an up to date roadmap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...
Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/688#issuecomment-104193448 Really nice text @thvasilo. I think it's a great introduction. But I also think that we should change a little bit the outline of our starting page. IMO, it should be governed by what a user expects from such a starting site. If I were a new user, I would expect something along the lines of: 1. Short introduction (your text is good if we condense the last paragraphs) 2. List of supported algorithms 3. Getting started (what dependencies to include etc.) 4. Tutorial/Example use case 5. Roadmap I'm not so sure whether the list of algorithms should be number 2 but that's something to discuss. What do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs
[ https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553959#comment-14553959 ] ASF GitHub Bot commented on FLINK-2034: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/688#issuecomment-104193448 Really nice text @thvasilo. I think it's a great introduction. But I also think that we should change a little bit the outline of our starting page. IMO, it should be governed by what a user expects from such a starting site. If I were a new user, I would expect something along the lines of: 1. Short introduction (your text is good if we condense the last paragraphs) 2. List of supported algorithms 3. Getting started (what dependencies to include etc.) 4. Tutorial/Example use case 5. Roadmap I'm not so sure whether the list of algorithms should be number 2 but that's something to discuss. What do you think? Add vision and roadmap for ML library to docs - Key: FLINK-2034 URL: https://issues.apache.org/jira/browse/FLINK-2034 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Theodore Vasiloudis Assignee: Theodore Vasiloudis Labels: ML Fix For: 0.9 We should have a document describing the vision of the Machine Learning library in Flink and an up to date roadmap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30785863 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala --- @@ -40,6 +44,32 @@ abstract class Solver extends Serializable with WithParameters { data: DataSet[LabeledVector], initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] + /** Creates initial weights vector, creating a DataSet with a WeightVector element +* +* @param initialWeights An Option that may contain an initial set of weights +* @param data The data for which we optimize the weights +* @return A DataSet containing a single WeightVector element +*/ + def createInitialWeightsDS(initialWeights: Option[DataSet[WeightVector]], + data: DataSet[LabeledVector]): DataSet[WeightVector] = { +// TODO: Faster way to do this? +val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) + +initialWeights match { + // Ensure provided weight vector is a DenseVector + case Some(wvDS) = +wvDS.map { wv = { + val denseWeights = wv.weights match { +case dv: DenseVector = dv +case sv: SparseVector = sv.toDenseVector + } + WeightVector(denseWeights, wv.intercept) --- End diff -- Not sure what you mean here, is it `WeightVector(denseWeights, wv.intercept)` that is off? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553963#comment-14553963 ] ASF GitHub Bot commented on FLINK-1992: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30785863 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala --- @@ -40,6 +44,32 @@ abstract class Solver extends Serializable with WithParameters { data: DataSet[LabeledVector], initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] + /** Creates initial weights vector, creating a DataSet with a WeightVector element +* +* @param initialWeights An Option that may contain an initial set of weights +* @param data The data for which we optimize the weights +* @return A DataSet containing a single WeightVector element +*/ + def createInitialWeightsDS(initialWeights: Option[DataSet[WeightVector]], + data: DataSet[LabeledVector]): DataSet[WeightVector] = { +// TODO: Faster way to do this? +val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) + +initialWeights match { + // Ensure provided weight vector is a DenseVector + case Some(wvDS) = +wvDS.map { wv = { + val denseWeights = wv.weights match { +case dv: DenseVector = dv +case sv: SparseVector = sv.toDenseVector + } + WeightVector(denseWeights, wv.intercept) --- End diff -- Not sure what you mean here, is it `WeightVector(denseWeights, wv.intercept)` that is off? Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30786437 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ --- End diff -- That is ScalaDoc what you've written. Usually this is only used for classes/objects/packages/methods but not method body comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs
[ https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553976#comment-14553976 ] ASF GitHub Bot commented on FLINK-2034: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30786608 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. + +Contrary to other data-processing systems, Flink exploits in-memory data streaming, and natively +executes iterative processing algorithms which are common in ML. We plan to exploit the streaming +nature of Flink, and provide functionality designed specifically for data streams. + +FlinkML will allow data scientists to test their models locally and using subsets of data, and then +use the same code to run their algorithms at a much larger scale in a cluster setting. + +We are inspired by other open source efforts to provide ML systems, in particular +[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML pipelines, and Spark’s +[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that scale with problem and +cluster sizes. + +We already have some of the building blocks for FlinkML in place, and will continue to extend the +library with more algorithms. An example of how simple it is to create a learning model in +FlinkML is given below: --- End diff -- A separate example section is a good idea. Still, I would like to keep this very small example here, to make it clear that getting up and running with the library is just a few lines of code. Add vision and roadmap for ML library to docs - Key: FLINK-2034 URL: https://issues.apache.org/jira/browse/FLINK-2034 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Theodore Vasiloudis Assignee: Theodore Vasiloudis Labels: ML Fix For: 0.9 We should have a document describing the vision of the Machine Learning library in Flink and an up to date roadmap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553975#comment-14553975 ] ASF GitHub Bot commented on FLINK-1992: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30786580 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala --- @@ -40,6 +44,32 @@ abstract class Solver extends Serializable with WithParameters { data: DataSet[LabeledVector], initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] + /** Creates initial weights vector, creating a DataSet with a WeightVector element +* +* @param initialWeights An Option that may contain an initial set of weights +* @param data The data for which we optimize the weights +* @return A DataSet containing a single WeightVector element +*/ + def createInitialWeightsDS(initialWeights: Option[DataSet[WeightVector]], + data: DataSet[LabeledVector]): DataSet[WeightVector] = { +// TODO: Faster way to do this? +val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) + +initialWeights match { + // Ensure provided weight vector is a DenseVector + case Some(wvDS) = +wvDS.map { wv = { + val denseWeights = wv.weights match { +case dv: DenseVector = dv +case sv: SparseVector = sv.toDenseVector + } + WeightVector(denseWeights, wv.intercept) --- End diff -- if you put `wv = {` in a new line, and indent the block of it, then it's fine. Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30786608 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. + +Contrary to other data-processing systems, Flink exploits in-memory data streaming, and natively +executes iterative processing algorithms which are common in ML. We plan to exploit the streaming +nature of Flink, and provide functionality designed specifically for data streams. + +FlinkML will allow data scientists to test their models locally and using subsets of data, and then +use the same code to run their algorithms at a much larger scale in a cluster setting. + +We are inspired by other open source efforts to provide ML systems, in particular +[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML pipelines, and Sparkâs +[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that scale with problem and +cluster sizes. + +We already have some of the building blocks for FlinkML in place, and will continue to extend the +library with more algorithms. An example of how simple it is to create a learning model in +FlinkML is given below: --- End diff -- A separate example section is a good idea. Still, I would like to keep this very small example here, to make it clear that getting up and running with the library is just a few lines of code. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30786580 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala --- @@ -40,6 +44,32 @@ abstract class Solver extends Serializable with WithParameters { data: DataSet[LabeledVector], initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] + /** Creates initial weights vector, creating a DataSet with a WeightVector element +* +* @param initialWeights An Option that may contain an initial set of weights +* @param data The data for which we optimize the weights +* @return A DataSet containing a single WeightVector element +*/ + def createInitialWeightsDS(initialWeights: Option[DataSet[WeightVector]], + data: DataSet[LabeledVector]): DataSet[WeightVector] = { +// TODO: Faster way to do this? +val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) + +initialWeights match { + // Ensure provided weight vector is a DenseVector + case Some(wvDS) = +wvDS.map { wv = { + val denseWeights = wv.weights match { +case dv: DenseVector = dv +case sv: SparseVector = sv.toDenseVector + } + WeightVector(denseWeights, wv.intercept) --- End diff -- if you put `wv = {` in a new line, and indent the block of it, then it's fine. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553915#comment-14553915 ] ASF GitHub Bot commented on FLINK-1992: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30783963 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ +def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): +DataSet[Double] = { + data.map { +new LossCalculation + }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) +.reduce { +(left, right) = + val (leftLoss, leftCount) = left + val (rightLoss, rightCount) = right + (leftLoss + rightLoss, rightCount + leftCount) + } +.map{new RegularizedLossCalculation} +.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) } - } - case None = createInitialWeightVector(dimensionsDS) -} - -// Perform the iterations -// TODO: Enable convergence stopping criterion, as in Multiple Linear regression -initialWeightsDS.iterate(numberOfIterations) { - weightVector = { -SGDStep(data, weightVector) - } +// We have to calculate for each weight vector the sum of squared residuals, +// and then sum them and apply regularization +val initialLossSumDS = lossCalculation(data, initialWeightsDS) + +// Combine weight vector with the current loss +val initialWeightsWithLossSum = initialWeightsDS. + crossWithTiny(initialLossSumDS).setParallelism(1) + +val resultWithLoss = initialWeightsWithLossSum. + iterateWithTermination(numberOfIterations) { + weightsWithLossSum = + +// Extract weight vector and loss +val previousWeightsDS = weightsWithLossSum.map{_._1} +val previousLossSumDS = weightsWithLossSum.map{_._2} + +val currentWeightsDS = SGDStep(data, previousWeightsDS) + +val currentLossSumDS = lossCalculation(data, currentWeightsDS) + +// Check if the relative change in the loss is smaller than the +// convergence threshold. If yes, then terminate i.e. return empty termination data set +val termination =
[jira] [Commented] (FLINK-2050) Add pipelining mechanism for chainable transformers and estimators
[ https://issues.apache.org/jira/browse/FLINK-2050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553914#comment-14553914 ] ASF GitHub Bot commented on FLINK-2050: --- Github user rmetzger commented on the pull request: https://github.com/apache/flink/pull/704#issuecomment-104186229 Documentation :white_check_mark: Scaladocs :white_check_mark: Tests :white_check_mark: No code outside of flink-ml touched :white_check_mark: :ship: it Add pipelining mechanism for chainable transformers and estimators -- Key: FLINK-2050 URL: https://issues.apache.org/jira/browse/FLINK-2050 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Till Rohrmann Labels: ML Fix For: 0.9 The key concept of an easy to use ML library is the quick and simple construction of data analysis pipelines. Scikit-learn's approach to define transformers and estimators seems to be a really good solution to this problem. I propose to follow a similar path, because it makes FlinkML flexible in terms of code reuse as well as easy for people coming from Scikit-learn to use the FlinkML. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30784001 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ +def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): +DataSet[Double] = { + data.map { +new LossCalculation + }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) +.reduce { +(left, right) = + val (leftLoss, leftCount) = left + val (rightLoss, rightCount) = right + (leftLoss + rightLoss, rightCount + leftCount) + } +.map{new RegularizedLossCalculation} +.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) } - } - case None = createInitialWeightVector(dimensionsDS) -} - -// Perform the iterations -// TODO: Enable convergence stopping criterion, as in Multiple Linear regression -initialWeightsDS.iterate(numberOfIterations) { - weightVector = { -SGDStep(data, weightVector) - } +// We have to calculate for each weight vector the sum of squared residuals, +// and then sum them and apply regularization +val initialLossSumDS = lossCalculation(data, initialWeightsDS) + +// Combine weight vector with the current loss +val initialWeightsWithLossSum = initialWeightsDS. + crossWithTiny(initialLossSumDS).setParallelism(1) + +val resultWithLoss = initialWeightsWithLossSum. + iterateWithTermination(numberOfIterations) { + weightsWithLossSum = + +// Extract weight vector and loss +val previousWeightsDS = weightsWithLossSum.map{_._1} +val previousLossSumDS = weightsWithLossSum.map{_._2} + +val currentWeightsDS = SGDStep(data, previousWeightsDS) + +val currentLossSumDS = lossCalculation(data, currentWeightsDS) + +// Check if the relative change in the loss is smaller than the +// convergence threshold. If yes, then terminate i.e. return empty termination data set +val termination = previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1). + filter{ + pair = { +val (previousLoss, currentLoss) = pair + +if (previousLoss = 0) { + false +} else
[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30784160 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. + +Contrary to other data-processing systems, Flink exploits in-memory data streaming, and natively +executes iterative processing algorithms which are common in ML. We plan to exploit the streaming +nature of Flink, and provide functionality designed specifically for data streams. + +FlinkML will allow data scientists to test their models locally and using subsets of data, and then +use the same code to run their algorithms at a much larger scale in a cluster setting. + +We are inspired by other open source efforts to provide ML systems, in particular +[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML pipelines, and Sparkâs +[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that scale with problem and +cluster sizes. + +We already have some of the building blocks for FlinkML in place, and will continue to extend the +library with more algorithms. An example of how simple it is to create a learning model in +FlinkML is given below: + +{% highlight scala %} +// LabelbedVector is a feature vector with a label (class or real value) +val data: DataSet[LabelVector] = ... --- End diff -- `LabeledVector` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30784150 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. + +Contrary to other data-processing systems, Flink exploits in-memory data streaming, and natively +executes iterative processing algorithms which are common in ML. We plan to exploit the streaming +nature of Flink, and provide functionality designed specifically for data streams. + +FlinkML will allow data scientists to test their models locally and using subsets of data, and then +use the same code to run their algorithms at a much larger scale in a cluster setting. + +We are inspired by other open source efforts to provide ML systems, in particular +[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML pipelines, and Sparkâs +[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that scale with problem and +cluster sizes. + +We already have some of the building blocks for FlinkML in place, and will continue to extend the +library with more algorithms. An example of how simple it is to create a learning model in +FlinkML is given below: + +{% highlight scala %} +// LabelbedVector is a feature vector with a label (class or real value) --- End diff -- typo: `LabeledVector` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30784205 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._ * At the moment, the whole partition is used for SGD, making it effectively a batch gradient * descent. Once a sampling operator has been introduced, the algorithm can be optimized * - * @param runParameters The parameters to tune the algorithm. Currently these include: - * [[Solver.LossFunction]] for the loss function to be used, - * [[Solver.RegularizationType]] for the type of regularization, - * [[Solver.RegularizationParameter]] for the regularization parameter, + * The parameters to tune the algorithm are: + * [[Solver.LossFunctionParameter]] for the loss function to be used, + * [[Solver.RegularizationTypeParameter]] for the type of regularization, + * [[Solver.RegularizationValueParameter]] for the regularization parameter, * [[IterativeSolver.Iterations]] for the maximum number of iteration, * [[IterativeSolver.Stepsize]] for the learning rate used. + * [[IterativeSolver.ConvergenceThreshold]] when provided the algorithm will + * stop the iterations if the change in the value of the objective + * function between successive iterations is is smaller than this value. */ -class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { +class GradientDescent() extends IterativeSolver() { --- End diff -- Nope, I'll remove these here and in the other occurrence. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553918#comment-14553918 ] ASF GitHub Bot commented on FLINK-1992: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30784142 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._ * At the moment, the whole partition is used for SGD, making it effectively a batch gradient * descent. Once a sampling operator has been introduced, the algorithm can be optimized * - * @param runParameters The parameters to tune the algorithm. Currently these include: - * [[Solver.LossFunction]] for the loss function to be used, - * [[Solver.RegularizationType]] for the type of regularization, - * [[Solver.RegularizationParameter]] for the regularization parameter, + * The parameters to tune the algorithm are: + * [[Solver.LossFunctionParameter]] for the loss function to be used, + * [[Solver.RegularizationTypeParameter]] for the type of regularization, + * [[Solver.RegularizationValueParameter]] for the regularization parameter, --- End diff -- I'm doing this to avoid confusion between the LossFunction class and the parameter setting here. Having the same name I think creates some confusion, especially when auto-completing stuff in Intellij. However the names are starting to get pretty long. You think we should remove this? Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs
[ https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553919#comment-14553919 ] ASF GitHub Bot commented on FLINK-2034: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30784150 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. + +Contrary to other data-processing systems, Flink exploits in-memory data streaming, and natively +executes iterative processing algorithms which are common in ML. We plan to exploit the streaming +nature of Flink, and provide functionality designed specifically for data streams. + +FlinkML will allow data scientists to test their models locally and using subsets of data, and then +use the same code to run their algorithms at a much larger scale in a cluster setting. + +We are inspired by other open source efforts to provide ML systems, in particular +[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML pipelines, and Spark’s +[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that scale with problem and +cluster sizes. + +We already have some of the building blocks for FlinkML in place, and will continue to extend the +library with more algorithms. An example of how simple it is to create a learning model in +FlinkML is given below: + +{% highlight scala %} +// LabelbedVector is a feature vector with a label (class or real value) --- End diff -- typo: `LabeledVector` Add vision and roadmap for ML library to docs - Key: FLINK-2034 URL: https://issues.apache.org/jira/browse/FLINK-2034 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Theodore Vasiloudis Assignee: Theodore Vasiloudis Labels: ML Fix For: 0.9 We should have a document describing the vision of the Machine Learning library in Flink and an up to date roadmap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553924#comment-14553924 ] ASF GitHub Bot commented on FLINK-1992: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30784205 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._ * At the moment, the whole partition is used for SGD, making it effectively a batch gradient * descent. Once a sampling operator has been introduced, the algorithm can be optimized * - * @param runParameters The parameters to tune the algorithm. Currently these include: - * [[Solver.LossFunction]] for the loss function to be used, - * [[Solver.RegularizationType]] for the type of regularization, - * [[Solver.RegularizationParameter]] for the regularization parameter, + * The parameters to tune the algorithm are: + * [[Solver.LossFunctionParameter]] for the loss function to be used, + * [[Solver.RegularizationTypeParameter]] for the type of regularization, + * [[Solver.RegularizationValueParameter]] for the regularization parameter, * [[IterativeSolver.Iterations]] for the maximum number of iteration, * [[IterativeSolver.Stepsize]] for the learning rate used. + * [[IterativeSolver.ConvergenceThreshold]] when provided the algorithm will + * stop the iterations if the change in the value of the objective + * function between successive iterations is is smaller than this value. */ -class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { +class GradientDescent() extends IterativeSolver() { --- End diff -- Nope, I'll remove these here and in the other occurrence. Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs
[ https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553921#comment-14553921 ] ASF GitHub Bot commented on FLINK-2034: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30784160 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. + +Contrary to other data-processing systems, Flink exploits in-memory data streaming, and natively +executes iterative processing algorithms which are common in ML. We plan to exploit the streaming +nature of Flink, and provide functionality designed specifically for data streams. + +FlinkML will allow data scientists to test their models locally and using subsets of data, and then +use the same code to run their algorithms at a much larger scale in a cluster setting. + +We are inspired by other open source efforts to provide ML systems, in particular +[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML pipelines, and Spark’s +[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that scale with problem and +cluster sizes. + +We already have some of the building blocks for FlinkML in place, and will continue to extend the +library with more algorithms. An example of how simple it is to create a learning model in +FlinkML is given below: + +{% highlight scala %} +// LabelbedVector is a feature vector with a label (class or real value) +val data: DataSet[LabelVector] = ... --- End diff -- `LabeledVector` Add vision and roadmap for ML library to docs - Key: FLINK-2034 URL: https://issues.apache.org/jira/browse/FLINK-2034 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Theodore Vasiloudis Assignee: Theodore Vasiloudis Labels: ML Fix For: 0.9 We should have a document describing the vision of the Machine Learning library in Flink and an up to date roadmap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30784183 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. + +Contrary to other data-processing systems, Flink exploits in-memory data streaming, and natively +executes iterative processing algorithms which are common in ML. We plan to exploit the streaming +nature of Flink, and provide functionality designed specifically for data streams. + +FlinkML will allow data scientists to test their models locally and using subsets of data, and then +use the same code to run their algorithms at a much larger scale in a cluster setting. + +We are inspired by other open source efforts to provide ML systems, in particular +[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML pipelines, and Sparkâs +[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that scale with problem and +cluster sizes. + +We already have some of the building blocks for FlinkML in place, and will continue to extend the +library with more algorithms. An example of how simple it is to create a learning model in +FlinkML is given below: + +{% highlight scala %} +// LabelbedVector is a feature vector with a label (class or real value) +val data: DataSet[LabelVector] = ... + +val learner = MultipleLinearRegression() + +val parameters = ParameterMap() + .add(MultipleLinearRegression.Stepsize, 1.0) + .add(MultipleLinearRegression.Iterations, 10) + .add(MultipleLinearRegression.ConvergenceThreshold, 0.001) + +val model = learner.fit(data, parameters) --- End diff -- With the new pipelining, this has to be updated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs
[ https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553923#comment-14553923 ] ASF GitHub Bot commented on FLINK-2034: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30784183 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. + +Contrary to other data-processing systems, Flink exploits in-memory data streaming, and natively +executes iterative processing algorithms which are common in ML. We plan to exploit the streaming +nature of Flink, and provide functionality designed specifically for data streams. + +FlinkML will allow data scientists to test their models locally and using subsets of data, and then +use the same code to run their algorithms at a much larger scale in a cluster setting. + +We are inspired by other open source efforts to provide ML systems, in particular +[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML pipelines, and Spark’s +[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that scale with problem and +cluster sizes. + +We already have some of the building blocks for FlinkML in place, and will continue to extend the +library with more algorithms. An example of how simple it is to create a learning model in +FlinkML is given below: + +{% highlight scala %} +// LabelbedVector is a feature vector with a label (class or real value) +val data: DataSet[LabelVector] = ... + +val learner = MultipleLinearRegression() + +val parameters = ParameterMap() + .add(MultipleLinearRegression.Stepsize, 1.0) + .add(MultipleLinearRegression.Iterations, 10) + .add(MultipleLinearRegression.ConvergenceThreshold, 0.001) + +val model = learner.fit(data, parameters) --- End diff -- With the new pipelining, this has to be updated. Add vision and roadmap for ML library to docs - Key: FLINK-2034 URL: https://issues.apache.org/jira/browse/FLINK-2034 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Theodore Vasiloudis Assignee: Theodore Vasiloudis Labels: ML Fix For: 0.9 We should have a document describing the vision of the Machine Learning library in Flink and an up to date roadmap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs
[ https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553929#comment-14553929 ] ASF GitHub Bot commented on FLINK-2034: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30784369 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. + +Contrary to other data-processing systems, Flink exploits in-memory data streaming, and natively +executes iterative processing algorithms which are common in ML. We plan to exploit the streaming +nature of Flink, and provide functionality designed specifically for data streams. + +FlinkML will allow data scientists to test their models locally and using subsets of data, and then +use the same code to run their algorithms at a much larger scale in a cluster setting. + +We are inspired by other open source efforts to provide ML systems, in particular +[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML pipelines, and Spark’s +[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that scale with problem and +cluster sizes. --- End diff -- Do you think, we need this paragraph here? Add vision and roadmap for ML library to docs - Key: FLINK-2034 URL: https://issues.apache.org/jira/browse/FLINK-2034 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Theodore Vasiloudis Assignee: Theodore Vasiloudis Labels: ML Fix For: 0.9 We should have a document describing the vision of the Machine Learning library in Flink and an up to date roadmap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs
[ https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553939#comment-14553939 ] ASF GitHub Bot commented on FLINK-2034: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30784588 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. --- End diff -- A good documentation should be self-evident IMHO. Guess we can merge this paragraph with the previous, can't we? Add vision and roadmap for ML library to docs - Key: FLINK-2034 URL: https://issues.apache.org/jira/browse/FLINK-2034 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Theodore Vasiloudis Assignee: Theodore Vasiloudis Labels: ML Fix For: 0.9 We should have a document describing the vision of the Machine Learning library in Flink and an up to date roadmap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30784588 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. --- End diff -- A good documentation should be self-evident IMHO. Guess we can merge this paragraph with the previous, can't we? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30784817 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ +def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): +DataSet[Double] = { + data.map { +new LossCalculation + }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) +.reduce { +(left, right) = + val (leftLoss, leftCount) = left + val (rightLoss, rightCount) = right + (leftLoss + rightLoss, rightCount + leftCount) + } +.map{new RegularizedLossCalculation} +.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) } - } - case None = createInitialWeightVector(dimensionsDS) -} - -// Perform the iterations -// TODO: Enable convergence stopping criterion, as in Multiple Linear regression -initialWeightsDS.iterate(numberOfIterations) { - weightVector = { -SGDStep(data, weightVector) - } +// We have to calculate for each weight vector the sum of squared residuals, +// and then sum them and apply regularization +val initialLossSumDS = lossCalculation(data, initialWeightsDS) + +// Combine weight vector with the current loss +val initialWeightsWithLossSum = initialWeightsDS. + crossWithTiny(initialLossSumDS).setParallelism(1) + +val resultWithLoss = initialWeightsWithLossSum. + iterateWithTermination(numberOfIterations) { + weightsWithLossSum = + +// Extract weight vector and loss +val previousWeightsDS = weightsWithLossSum.map{_._1} +val previousLossSumDS = weightsWithLossSum.map{_._2} + +val currentWeightsDS = SGDStep(data, previousWeightsDS) + +val currentLossSumDS = lossCalculation(data, currentWeightsDS) + +// Check if the relative change in the loss is smaller than the +// convergence threshold. If yes, then terminate i.e. return empty termination data set +val termination = previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1). + filter{ + pair = { +val (previousLoss, currentLoss) = pair + +if (previousLoss = 0) { + false +} else
[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30784829 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. + +Contrary to other data-processing systems, Flink exploits in-memory data streaming, and natively +executes iterative processing algorithms which are common in ML. We plan to exploit the streaming +nature of Flink, and provide functionality designed specifically for data streams. + +FlinkML will allow data scientists to test their models locally and using subsets of data, and then +use the same code to run their algorithms at a much larger scale in a cluster setting. + +We are inspired by other open source efforts to provide ML systems, in particular +[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML pipelines, and Sparkâs +[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that scale with problem and +cluster sizes. + +We already have some of the building blocks for FlinkML in place, and will continue to extend the +library with more algorithms. An example of how simple it is to create a learning model in +FlinkML is given below: --- End diff -- Good idea to show how to use FlinkML. I would extend the example a little bit and put it into a separate section. Maybe making a small tutorial out of it including the things you have to add to the your pom etc. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs
[ https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553946#comment-14553946 ] ASF GitHub Bot commented on FLINK-2034: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30784829 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. + +Contrary to other data-processing systems, Flink exploits in-memory data streaming, and natively +executes iterative processing algorithms which are common in ML. We plan to exploit the streaming +nature of Flink, and provide functionality designed specifically for data streams. + +FlinkML will allow data scientists to test their models locally and using subsets of data, and then +use the same code to run their algorithms at a much larger scale in a cluster setting. + +We are inspired by other open source efforts to provide ML systems, in particular +[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML pipelines, and Spark’s +[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that scale with problem and +cluster sizes. + +We already have some of the building blocks for FlinkML in place, and will continue to extend the +library with more algorithms. An example of how simple it is to create a learning model in +FlinkML is given below: --- End diff -- Good idea to show how to use FlinkML. I would extend the example a little bit and put it into a separate section. Maybe making a small tutorial out of it including the things you have to add to the your pom etc. Add vision and roadmap for ML library to docs - Key: FLINK-2034 URL: https://issues.apache.org/jira/browse/FLINK-2034 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Theodore Vasiloudis Assignee: Theodore Vasiloudis Labels: ML Fix For: 0.9 We should have a document describing the vision of the Machine Learning library in Flink and an up to date roadmap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30784959 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. + +Contrary to other data-processing systems, Flink exploits in-memory data streaming, and natively +executes iterative processing algorithms which are common in ML. We plan to exploit the streaming +nature of Flink, and provide functionality designed specifically for data streams. + +FlinkML will allow data scientists to test their models locally and using subsets of data, and then +use the same code to run their algorithms at a much larger scale in a cluster setting. + +We are inspired by other open source efforts to provide ML systems, in particular +[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML pipelines, and Sparkâs +[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that scale with problem and +cluster sizes. + +We already have some of the building blocks for FlinkML in place, and will continue to extend the +library with more algorithms. An example of how simple it is to create a learning model in +FlinkML is given below: + +{% highlight scala %} +// LabelbedVector is a feature vector with a label (class or real value) +val data: DataSet[LabelVector] = ... + +val learner = MultipleLinearRegression() + +val parameters = ParameterMap() + .add(MultipleLinearRegression.Stepsize, 1.0) + .add(MultipleLinearRegression.Iterations, 10) + .add(MultipleLinearRegression.ConvergenceThreshold, 0.001) + +val model = learner.fit(data, parameters) +{% endhighlight %} + +The roadmap below can provide an indication of the algorithms we aim to implement in the coming +months. Items in **bold** have already been implemented: + + +* Pipelines of transformers and learners +* Data pre-processing + * **Feature scaling** + * **Polynomial feature base mapper** + * Feature hashing + * Feature extraction for text + * Dimensionality reduction +* Model selection and performance evaluation + * Cross-validation for model selection and evaluation +* Supervised learning + * Optimization framework +* **Stochastic Gradient Descent** +* L-BFGS + * Generalized Linear Models +* **Multiple linear regression** +* LASSO, Ridge regression +* Multi-class Logistic regression + * Random forests + * **Support Vector Machines** +* Unsupervised learning + * Clustering +* K-means clustering + * PCA +* Recommendation + * **ALS** +* Text analytics + * LDA +* Statistical estimation tools +* Distributed linear algebra +* Streaming ML --- End diff -- I like the idea of a roadmap so that people see where we're steering. But I would probably put it on a separate page where we can also elaborate a little bit what distributed linear algebra means and how it integrates. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs
[ https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553949#comment-14553949 ] ASF GitHub Bot commented on FLINK-2034: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30784959 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. + +Contrary to other data-processing systems, Flink exploits in-memory data streaming, and natively +executes iterative processing algorithms which are common in ML. We plan to exploit the streaming +nature of Flink, and provide functionality designed specifically for data streams. + +FlinkML will allow data scientists to test their models locally and using subsets of data, and then +use the same code to run their algorithms at a much larger scale in a cluster setting. + +We are inspired by other open source efforts to provide ML systems, in particular +[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML pipelines, and Spark’s +[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that scale with problem and +cluster sizes. + +We already have some of the building blocks for FlinkML in place, and will continue to extend the +library with more algorithms. An example of how simple it is to create a learning model in +FlinkML is given below: + +{% highlight scala %} +// LabelbedVector is a feature vector with a label (class or real value) +val data: DataSet[LabelVector] = ... + +val learner = MultipleLinearRegression() + +val parameters = ParameterMap() + .add(MultipleLinearRegression.Stepsize, 1.0) + .add(MultipleLinearRegression.Iterations, 10) + .add(MultipleLinearRegression.ConvergenceThreshold, 0.001) + +val model = learner.fit(data, parameters) +{% endhighlight %} + +The roadmap below can provide an indication of the algorithms we aim to implement in the coming +months. Items in **bold** have already been implemented: + + +* Pipelines of transformers and learners +* Data pre-processing + * **Feature scaling** + * **Polynomial feature base mapper** + * Feature hashing + * Feature extraction for text + * Dimensionality reduction +* Model selection and performance evaluation + * Cross-validation for model selection and evaluation +* Supervised learning + * Optimization framework +* **Stochastic Gradient Descent** +* L-BFGS + * Generalized Linear Models +* **Multiple linear regression** +* LASSO, Ridge regression +* Multi-class Logistic regression + * Random forests + * **Support Vector Machines** +* Unsupervised learning + * Clustering +* K-means clustering + * PCA +* Recommendation + * **ALS** +* Text analytics + * LDA +* Statistical estimation tools +* Distributed linear algebra +* Streaming ML --- End diff -- I like the idea of a roadmap so that people see where we're steering. But I would probably put it on a separate page where we can also elaborate a little bit what distributed linear algebra means and how it integrates. Add vision and roadmap for ML library to docs - Key: FLINK-2034 URL: https://issues.apache.org/jira/browse/FLINK-2034 Project: Flink
[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30785023 --- Diff: docs/libs/ml/index.md --- @@ -30,7 +122,7 @@ under the License. /dependency {% endhighlight %} -## Algorithms +## Algorithm Documentation --- End diff -- I think, we should move the algorithms a little bit more prominently to the top of the page. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30785537 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala --- @@ -95,6 +95,7 @@ abstract class DiffRegularization extends Regularization { regParameter: Double) } +// TODO(tvas): I think NoRegularization should extend DiffRegularization --- End diff -- Because the idea is to disallow setting non-differentiable regularization in solver like L-BFGS which require differentiable loss, through type-matching. By setting NoRegularization to extend DiffRegularization we would be able to use it with L-BFGS --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553957#comment-14553957 ] ASF GitHub Bot commented on FLINK-1992: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30785537 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala --- @@ -95,6 +95,7 @@ abstract class DiffRegularization extends Regularization { regParameter: Double) } +// TODO(tvas): I think NoRegularization should extend DiffRegularization --- End diff -- Because the idea is to disallow setting non-differentiable regularization in solver like L-BFGS which require differentiable loss, through type-matching. By setting NoRegularization to extend DiffRegularization we would be able to use it with L-BFGS Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553974#comment-14553974 ] ASF GitHub Bot commented on FLINK-1992: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30786533 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala --- @@ -95,6 +95,7 @@ abstract class DiffRegularization extends Regularization { regParameter: Double) } +// TODO(tvas): I think NoRegularization should extend DiffRegularization --- End diff -- Sounds reasonable. Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30786533 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala --- @@ -95,6 +95,7 @@ abstract class DiffRegularization extends Regularization { regParameter: Double) } +// TODO(tvas): I think NoRegularization should extend DiffRegularization --- End diff -- Sounds reasonable. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553971#comment-14553971 ] ASF GitHub Bot commented on FLINK-1992: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30786437 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ --- End diff -- That is ScalaDoc what you've written. Usually this is only used for classes/objects/packages/methods but not method body comments. Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30786490 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. --- End diff -- I think the reason this is here is to communicate to people that would also like to contribute to the library that we consider documentation an integral part of the library and not an after-thought. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs
[ https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553973#comment-14553973 ] ASF GitHub Bot commented on FLINK-2034: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30786490 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. --- End diff -- I think the reason this is here is to communicate to people that would also like to contribute to the library that we consider documentation an integral part of the library and not an after-thought. Add vision and roadmap for ML library to docs - Key: FLINK-2034 URL: https://issues.apache.org/jira/browse/FLINK-2034 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Theodore Vasiloudis Assignee: Theodore Vasiloudis Labels: ML Fix For: 0.9 We should have a document describing the vision of the Machine Learning library in Flink and an up to date roadmap. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30786639 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. + +Contrary to other data-processing systems, Flink exploits in-memory data streaming, and natively +executes iterative processing algorithms which are common in ML. We plan to exploit the streaming +nature of Flink, and provide functionality designed specifically for data streams. + +FlinkML will allow data scientists to test their models locally and using subsets of data, and then +use the same code to run their algorithms at a much larger scale in a cluster setting. + +We are inspired by other open source efforts to provide ML systems, in particular +[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML pipelines, and Sparkâs +[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that scale with problem and +cluster sizes. + +We already have some of the building blocks for FlinkML in place, and will continue to extend the +library with more algorithms. An example of how simple it is to create a learning model in +FlinkML is given below: + +{% highlight scala %} +// LabelbedVector is a feature vector with a label (class or real value) +val data: DataSet[LabelVector] = ... + +val learner = MultipleLinearRegression() + +val parameters = ParameterMap() + .add(MultipleLinearRegression.Stepsize, 1.0) + .add(MultipleLinearRegression.Iterations, 10) + .add(MultipleLinearRegression.ConvergenceThreshold, 0.001) + +val model = learner.fit(data, parameters) +{% endhighlight %} + +The roadmap below can provide an indication of the algorithms we aim to implement in the coming +months. Items in **bold** have already been implemented: + + +* Pipelines of transformers and learners +* Data pre-processing + * **Feature scaling** + * **Polynomial feature base mapper** + * Feature hashing + * Feature extraction for text + * Dimensionality reduction +* Model selection and performance evaluation + * Cross-validation for model selection and evaluation +* Supervised learning + * Optimization framework +* **Stochastic Gradient Descent** +* L-BFGS + * Generalized Linear Models +* **Multiple linear regression** +* LASSO, Ridge regression +* Multi-class Logistic regression + * Random forests + * **Support Vector Machines** +* Unsupervised learning + * Clustering +* K-means clustering + * PCA +* Recommendation + * **ALS** +* Text analytics + * LDA +* Statistical estimation tools +* Distributed linear algebra +* Streaming ML --- End diff -- That's a good idea, I'll add a link to the roadmap instead. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs
[ https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553977#comment-14553977 ] ASF GitHub Bot commented on FLINK-2034: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30786639 --- Diff: docs/libs/ml/index.md --- @@ -20,8 +20,100 @@ specific language governing permissions and limitations under the License. -- +The Machine Learning (ML) library for Flink is a new effort to bring scalable ML tools to the Flink +community. Our goal is is to design and implement a system that is scalable and can deal with +problems of various sizes, whether your data size is measured in megabytes or terabytes and beyond. +We call this library FlinkML. + +An important concern for developers of ML systems is the amount of glue code that developers are +forced to write [1] in the process of implementing an end-to-end ML system. Our goal with FlinkML +is to help developers keep glue code to a minimum. The Flink ecosystem provides a great setting to +tackle this problem, with its scalable ETL capabilities that can be easily combined inside the same +program with FlinkML, allowing the development of robust pipelines without the need to use yet +another technology for data ingestion and data munging. + +Another goal for FlinkML is to make the library easy to use. To that end we will be providing +detailed documentation along with examples for every part of the system. Our aim is that developers +will be able to get started with writing their ML pipelines quickly, using familiar programming +concepts and terminology. + +Contrary to other data-processing systems, Flink exploits in-memory data streaming, and natively +executes iterative processing algorithms which are common in ML. We plan to exploit the streaming +nature of Flink, and provide functionality designed specifically for data streams. + +FlinkML will allow data scientists to test their models locally and using subsets of data, and then +use the same code to run their algorithms at a much larger scale in a cluster setting. + +We are inspired by other open source efforts to provide ML systems, in particular +[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML pipelines, and Spark’s +[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that scale with problem and +cluster sizes. + +We already have some of the building blocks for FlinkML in place, and will continue to extend the +library with more algorithms. An example of how simple it is to create a learning model in +FlinkML is given below: + +{% highlight scala %} +// LabelbedVector is a feature vector with a label (class or real value) +val data: DataSet[LabelVector] = ... + +val learner = MultipleLinearRegression() + +val parameters = ParameterMap() + .add(MultipleLinearRegression.Stepsize, 1.0) + .add(MultipleLinearRegression.Iterations, 10) + .add(MultipleLinearRegression.ConvergenceThreshold, 0.001) + +val model = learner.fit(data, parameters) +{% endhighlight %} + +The roadmap below can provide an indication of the algorithms we aim to implement in the coming +months. Items in **bold** have already been implemented: + + +* Pipelines of transformers and learners +* Data pre-processing + * **Feature scaling** + * **Polynomial feature base mapper** + * Feature hashing + * Feature extraction for text + * Dimensionality reduction +* Model selection and performance evaluation + * Cross-validation for model selection and evaluation +* Supervised learning + * Optimization framework +* **Stochastic Gradient Descent** +* L-BFGS + * Generalized Linear Models +* **Multiple linear regression** +* LASSO, Ridge regression +* Multi-class Logistic regression + * Random forests + * **Support Vector Machines** +* Unsupervised learning + * Clustering +* K-means clustering + * PCA +* Recommendation + * **ALS** +* Text analytics + * LDA +* Statistical estimation tools +* Distributed linear algebra +* Streaming ML --- End diff -- That's a good idea, I'll add a link to the roadmap instead. Add vision and roadmap for ML library to docs - Key: FLINK-2034 URL: https://issues.apache.org/jira/browse/FLINK-2034 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Theodore Vasiloudis Assignee: Theodore Vasiloudis
[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/688#discussion_r30786723 --- Diff: docs/libs/ml/index.md --- @@ -30,7 +122,7 @@ under the License. /dependency {% endhighlight %} -## Algorithms +## Algorithm Documentation --- End diff -- Yeah this becomes a bit problematic now. This text is not really what you expect from the index of a library, so maybe it should be moved to its own section (Vision and roadmap?) with only a very brief introduction remaining on the index. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553983#comment-14553983 ] ASF GitHub Bot commented on FLINK-1992: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30786949 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._ * At the moment, the whole partition is used for SGD, making it effectively a batch gradient * descent. Once a sampling operator has been introduced, the algorithm can be optimized * - * @param runParameters The parameters to tune the algorithm. Currently these include: - * [[Solver.LossFunction]] for the loss function to be used, - * [[Solver.RegularizationType]] for the type of regularization, - * [[Solver.RegularizationParameter]] for the regularization parameter, + * The parameters to tune the algorithm are: + * [[Solver.LossFunctionParameter]] for the loss function to be used, + * [[Solver.RegularizationTypeParameter]] for the type of regularization, + * [[Solver.RegularizationValueParameter]] for the regularization parameter, --- End diff -- Does IntelliJ suggests the wrong `LossFunction` type when you use it to set the parameter value in the `ParameterMap`? I see the point with the same type names from a programmer's perspective. As a user, though, it makes sense that the parameter is called `LossFunction`, for example. I think I'm slightly in favour of the shorter version. Add convergence criterion to SGD optimizer -- Key: FLINK-1992 URL: https://issues.apache.org/jira/browse/FLINK-1992 Project: Flink Issue Type: Improvement Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Theodore Vasiloudis Priority: Minor Labels: ML Fix For: 0.9 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It would be good to support a dynamic convergence criterion, too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Some fixes for Scala type analysis
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/669#issuecomment-104204248 It would be good to share a bit of information beyond `Some fixes for Scala type analysis` about what is actually changed. Then others could think about possible implications and give comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30786949 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._ * At the moment, the whole partition is used for SGD, making it effectively a batch gradient * descent. Once a sampling operator has been introduced, the algorithm can be optimized * - * @param runParameters The parameters to tune the algorithm. Currently these include: - * [[Solver.LossFunction]] for the loss function to be used, - * [[Solver.RegularizationType]] for the type of regularization, - * [[Solver.RegularizationParameter]] for the regularization parameter, + * The parameters to tune the algorithm are: + * [[Solver.LossFunctionParameter]] for the loss function to be used, + * [[Solver.RegularizationTypeParameter]] for the type of regularization, + * [[Solver.RegularizationValueParameter]] for the regularization parameter, --- End diff -- Does IntelliJ suggests the wrong `LossFunction` type when you use it to set the parameter value in the `ParameterMap`? I see the point with the same type names from a programmer's perspective. As a user, though, it makes sense that the parameter is called `LossFunction`, for example. I think I'm slightly in favour of the shorter version. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer
[ https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553989#comment-14553989 ] ASF GitHub Bot commented on FLINK-1992: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/692#discussion_r30787129 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala --- @@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) extends IterativeSolver { }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST) } + + /** Provides a solution for the given optimization problem * * @param data A Dataset of LabeledVector (label, features) pairs -* @param initWeights The initial weights that will be optimized +* @param initialWeights The initial weights that will be optimized * @return The weights, optimized for the provided data. */ override def optimize( data: DataSet[LabeledVector], -initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { -// TODO: Faster way to do this? -val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b) - -val numberOfIterations: Int = parameterMap(Iterations) +initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = { +val numberOfIterations: Int = parameters(Iterations) +// TODO(tvas): This looks out of place, why don't we get back an Option from +// parameters(ConvergenceThreshold)? +val convergenceThresholdOption = parameters.get(ConvergenceThreshold) // Initialize weights -val initialWeightsDS: DataSet[WeightVector] = initWeights match { - // Ensure provided weight vector is a DenseVector - case Some(wvDS) = { -wvDS.map{wv = { - val denseWeights = wv.weights match { -case dv: DenseVector = dv -case sv: SparseVector = sv.toDenseVector +val initialWeightsDS: DataSet[WeightVector] = createInitialWeightsDS(initialWeights, data) + +// Perform the iterations +val optimizedWeights = convergenceThresholdOption match { + // No convergence criterion + case None = +initialWeightsDS.iterate(numberOfIterations) { + weightVectorDS = { +SGDStep(data, weightVectorDS) } - WeightVector(denseWeights, wv.intercept) } - + case Some(convergence) = +/** Calculates the regularized loss, from the data and given weights **/ +def lossCalculation(data: DataSet[LabeledVector], weightDS: DataSet[WeightVector]): +DataSet[Double] = { + data.map { +new LossCalculation + }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) +.reduce { +(left, right) = + val (leftLoss, leftCount) = left + val (rightLoss, rightCount) = right + (leftLoss + rightLoss, rightCount + leftCount) + } +.map{new RegularizedLossCalculation} +.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST) } - } - case None = createInitialWeightVector(dimensionsDS) -} - -// Perform the iterations -// TODO: Enable convergence stopping criterion, as in Multiple Linear regression -initialWeightsDS.iterate(numberOfIterations) { - weightVector = { -SGDStep(data, weightVector) - } +// We have to calculate for each weight vector the sum of squared residuals, +// and then sum them and apply regularization +val initialLossSumDS = lossCalculation(data, initialWeightsDS) + +// Combine weight vector with the current loss +val initialWeightsWithLossSum = initialWeightsDS. + crossWithTiny(initialLossSumDS).setParallelism(1) + +val resultWithLoss = initialWeightsWithLossSum. + iterateWithTermination(numberOfIterations) { + weightsWithLossSum = + +// Extract weight vector and loss +val previousWeightsDS = weightsWithLossSum.map{_._1} +val previousLossSumDS = weightsWithLossSum.map{_._2} + +val currentWeightsDS = SGDStep(data, previousWeightsDS) + +val currentLossSumDS = lossCalculation(data, currentWeightsDS) + +// Check if the relative change in the loss is smaller than the +// convergence threshold. If yes, then terminate i.e. return empty termination data set +val termination =