[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30779810
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._
   * At the moment, the whole partition is used for SGD, making it 
effectively a batch gradient
   * descent. Once a sampling operator has been introduced, the algorithm 
can be optimized
   *
-  * @param runParameters The parameters to tune the algorithm. Currently 
these include:
-  *  [[Solver.LossFunction]] for the loss function to 
be used,
-  *  [[Solver.RegularizationType]] for the type of 
regularization,
-  *  [[Solver.RegularizationParameter]] for the 
regularization parameter,
+  *  The parameters to tune the algorithm are:
+  *  [[Solver.LossFunctionParameter]] for the loss 
function to be used,
+  *  [[Solver.RegularizationTypeParameter]] for the 
type of regularization,
+  *  [[Solver.RegularizationValueParameter]] for the 
regularization parameter,
--- End diff --

Do we want to append the Parameter suffix to the parameter names? So far 
we haven't done that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: From stream to h base

2015-05-21 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/702#issuecomment-104170195
  
I like the idea of this, but we have to be aware that this breaks the 
existing streaming API by
renaming the `writeToFile()` method.

Since `writeToFile` is misleading anyways, I am in favor of this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553820#comment-14553820
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30780043
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
--- End diff --

One line comments should use `//`


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553830#comment-14553830
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30780327
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
+def lossCalculation(data: DataSet[LabeledVector], weightDS: 
DataSet[WeightVector]):
+DataSet[Double] = {
+  data.map {
+new LossCalculation
+  }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
+.reduce {
+(left, right) =
+  val (leftLoss, leftCount) = left
+  val (rightLoss, rightCount) = right
+  (leftLoss + rightLoss, rightCount + leftCount)
+  }
+.map{new RegularizedLossCalculation}
+.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
--- End diff --

Indentation seems to be off.


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30780327
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
+def lossCalculation(data: DataSet[LabeledVector], weightDS: 
DataSet[WeightVector]):
+DataSet[Double] = {
+  data.map {
+new LossCalculation
+  }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
+.reduce {
+(left, right) =
+  val (leftLoss, leftCount) = left
+  val (rightLoss, rightCount) = right
+  (leftLoss + rightLoss, rightCount + leftCount)
+  }
+.map{new RegularizedLossCalculation}
+.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
--- End diff --

Indentation seems to be off.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2050) Add pipelining mechanism for chainable transformers and estimators

2015-05-21 Thread Theodore Vasiloudis (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553840#comment-14553840
 ] 

Theodore Vasiloudis commented on FLINK-2050:


[~till.rohrmann]  I'm going through the code now, could you take a look at the 
documentation when you get the chance?

 Add pipelining mechanism for chainable transformers and estimators
 --

 Key: FLINK-2050
 URL: https://issues.apache.org/jira/browse/FLINK-2050
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Till Rohrmann
  Labels: ML
 Fix For: 0.9


 The key concept of an easy to use ML library is the quick and simple 
 construction of data analysis pipelines. Scikit-learn's approach to define 
 transformers and estimators seems to be a really good solution to this 
 problem. I propose to follow a similar path, because it makes FlinkML 
 flexible in terms of code reuse as well as easy for people coming from 
 Scikit-learn to use the FlinkML.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30781135
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
 ---
@@ -40,6 +44,32 @@ abstract class Solver extends Serializable with 
WithParameters {
 data: DataSet[LabeledVector],
 initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector]
 
+  /** Creates initial weights vector, creating a DataSet with a 
WeightVector element
+*
+* @param initialWeights An Option that may contain an initial set of 
weights
+* @param data The data for which we optimize the weights
+* @return A DataSet containing a single WeightVector element
+*/
+  def createInitialWeightsDS(initialWeights: Option[DataSet[WeightVector]],
+ data: DataSet[LabeledVector]):  
DataSet[WeightVector] = {
+// TODO: Faster way to do this?
+val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
+
+initialWeights match {
+  // Ensure provided weight vector is a DenseVector
+  case Some(wvDS) =
+wvDS.map { wv = {
+  val denseWeights = wv.weights match {
+case dv: DenseVector = dv
+case sv: SparseVector = sv.toDenseVector
+  }
+  WeightVector(denseWeights, wv.intercept)
--- End diff --

Indentation one level off.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553851#comment-14553851
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30781085
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala
 ---
@@ -95,6 +95,7 @@ abstract class DiffRegularization extends Regularization {
   regParameter: Double)
 }
 
+// TODO(tvas): I think NoRegularization should extend DiffRegularization
--- End diff --

Why do you think?


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30781429
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala
 ---
@@ -205,6 +190,61 @@ class GradientDescentITSuite extends FlatSpec with 
Matchers with FlinkTestBase {
 
   }
 
-  // TODO: Need more corner cases
+  it should terminate early if the convergence criterion is reached in {
+// TODO(tvas): We need a better way to check the convergence of the 
weights.
+// Ideally we want to have a Breeze-like system, where the optimizers 
carry a history and that
+// can tell us whether we have converged and at which iteration
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+env.setParallelism(2)
+
+val sgdEarlyTerminate = GradientDescent()
+  .setConvergenceThreshold(1e2)
+  .setStepsize(1.0)
+  .setIterations(800)
+  .setLossFunction(SquaredLoss())
+  .setRegularizationType(NoRegularization())
+  .setRegularizationParameter(0.0)
+
+val inputDS = env.fromCollection(data)
+
+val weightDSEarlyTerminate = sgdEarlyTerminate.optimize(inputDS, None)
+
+val weightListEarly: Seq[WeightVector] = 
weightDSEarlyTerminate.collect()
+
+weightListEarly.size should equal(1)
+
+val weightVectorEarly: WeightVector = weightListEarly.head
+val weightsEarly = 
weightVectorEarly.weights.asInstanceOf[DenseVector].data
+val weight0Early = weightVectorEarly.intercept
+
+val sgdNoConvergence = GradientDescent()
+  .setStepsize(1.0)
+  .setIterations(800)
+  .setLossFunction(SquaredLoss())
+  .setRegularizationType(NoRegularization())
+  .setRegularizationParameter(0.0)
+
+
--- End diff --

two line breaks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553863#comment-14553863
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30781429
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala
 ---
@@ -205,6 +190,61 @@ class GradientDescentITSuite extends FlatSpec with 
Matchers with FlinkTestBase {
 
   }
 
-  // TODO: Need more corner cases
+  it should terminate early if the convergence criterion is reached in {
+// TODO(tvas): We need a better way to check the convergence of the 
weights.
+// Ideally we want to have a Breeze-like system, where the optimizers 
carry a history and that
+// can tell us whether we have converged and at which iteration
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+env.setParallelism(2)
+
+val sgdEarlyTerminate = GradientDescent()
+  .setConvergenceThreshold(1e2)
+  .setStepsize(1.0)
+  .setIterations(800)
+  .setLossFunction(SquaredLoss())
+  .setRegularizationType(NoRegularization())
+  .setRegularizationParameter(0.0)
+
+val inputDS = env.fromCollection(data)
+
+val weightDSEarlyTerminate = sgdEarlyTerminate.optimize(inputDS, None)
+
+val weightListEarly: Seq[WeightVector] = 
weightDSEarlyTerminate.collect()
+
+weightListEarly.size should equal(1)
+
+val weightVectorEarly: WeightVector = weightListEarly.head
+val weightsEarly = 
weightVectorEarly.weights.asInstanceOf[DenseVector].data
+val weight0Early = weightVectorEarly.intercept
+
+val sgdNoConvergence = GradientDescent()
+  .setStepsize(1.0)
+  .setIterations(800)
+  .setLossFunction(SquaredLoss())
+  .setRegularizationType(NoRegularization())
+  .setRegularizationParameter(0.0)
+
+
--- End diff --

two line breaks.


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30781414
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/optimization/GradientDescentITSuite.scala
 ---
@@ -205,6 +190,61 @@ class GradientDescentITSuite extends FlatSpec with 
Matchers with FlinkTestBase {
 
   }
 
-  // TODO: Need more corner cases
+  it should terminate early if the convergence criterion is reached in {
+// TODO(tvas): We need a better way to check the convergence of the 
weights.
+// Ideally we want to have a Breeze-like system, where the optimizers 
carry a history and that
+// can tell us whether we have converged and at which iteration
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+env.setParallelism(2)
+
+val sgdEarlyTerminate = GradientDescent()
+  .setConvergenceThreshold(1e2)
+  .setStepsize(1.0)
+  .setIterations(800)
+  .setLossFunction(SquaredLoss())
+  .setRegularizationType(NoRegularization())
+  .setRegularizationParameter(0.0)
+
+val inputDS = env.fromCollection(data)
+
+val weightDSEarlyTerminate = sgdEarlyTerminate.optimize(inputDS, None)
+
+val weightListEarly: Seq[WeightVector] = 
weightDSEarlyTerminate.collect()
+
+weightListEarly.size should equal(1)
+
+val weightVectorEarly: WeightVector = weightListEarly.head
+val weightsEarly = 
weightVectorEarly.weights.asInstanceOf[DenseVector].data
+val weight0Early = weightVectorEarly.intercept
+
+val sgdNoConvergence = GradientDescent()
+  .setStepsize(1.0)
--- End diff --

If this solves the problem, then go for it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/692#issuecomment-104179672
  
Great work @thvasilo. I had some minor comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553874#comment-14553874
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/692#issuecomment-104179672
  
Great work @thvasilo. I had some minor comments.


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [Flink-1985] Streaming does not correctly forw...

2015-05-21 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/682#discussion_r30782105
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ---
@@ -93,6 +96,15 @@ public JobGraph createJobGraph(String jobName) {

configureCheckpointing();
 
+   try {
+   InstantiationUtil.writeObjectToConfig(
+   this.streamGraph.getExecutionConfig(),
+   this.jobGraph.getJobConfiguration(),
+   ExecutionConfig.CONFIG_KEY);
+   } catch (IOException e) {
+   throw new RuntimeException(Config object could not be 
written to Job Configuration:  + e);
--- End diff --

Can you replace the + by a , ?
Throwable.toString() does not really give useful output.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2003) Building on some encrypted filesystems leads to File name too long error

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2003?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553880#comment-14553880
 ] 

ASF GitHub Bot commented on FLINK-2003:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/690#issuecomment-104181351
  
I would actually say into both, the README and the how to contribute 
section.


 Building on some encrypted filesystems leads to File name too long error
 --

 Key: FLINK-2003
 URL: https://issues.apache.org/jira/browse/FLINK-2003
 Project: Flink
  Issue Type: Bug
  Components: Build System
Reporter: Theodore Vasiloudis
Priority: Minor
  Labels: build, starter

 The classnames generated from the build system can be too long.
 Creating too long filenames in some encrypted filesystems is not possible, 
 including encfs which is what Ubuntu uses.
 This the same as this [Spark 
 issue|https://issues.apache.org/jira/browse/SPARK-4820]
 The workaround (taken from the linked issue) is to add in Maven under the 
 compile options: 
 {code}
 +  arg-Xmax-classfile-name/arg
 +  arg128/arg
 {code}
 And in SBT add:
 {code}
 +scalacOptions in Compile ++= Seq(-Xmax-classfile-name, 128),
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2003] Building on some encrypted filesy...

2015-05-21 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/690#issuecomment-104181351
  
I would actually say into both, the README and the how to contribute 
section.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2050) Add pipelining mechanism for chainable transformers and estimators

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553896#comment-14553896
 ] 

ASF GitHub Bot commented on FLINK-2050:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/704#discussion_r30783065
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -22,38 +22,47 @@ import breeze.linalg
 import breeze.numerics.sqrt
 import breeze.numerics.sqrt._
 import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
 import org.apache.flink.ml.math.Breeze._
-import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
+import org.apache.flink.ml.pipeline.{TransformOperation, FitOperation, 
Transformer}
 import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
 
+import scala.reflect.ClassTag
+
 /** Scales observations, so that all features have a user-specified mean 
and standard deviation.
   * By default for [[StandardScaler]] transformer mean=0.0 and std=1.0.
   *
-  * This transformer takes a [[Vector]] of values and maps it to a
-  * scaled [[Vector]] such that each feature has a user-specified mean and 
standard deviation.
+  * This transformer takes a subtype of  [[Vector]] of values and maps it 
to a
+  * scaled subtype of [[Vector]] such that each feature has a 
user-specified mean and standard
+  * deviation.
   *
   * This transformer can be prepended to all [[Transformer]] and
-  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
-  * [[Vector]].
+  * [[org.apache.flink.ml.pipeline.Predictor]] implementations which 
expect as input a subtype
+  * of [[Vector]].
   *
   * @example
   *  {{{
   *val trainingDS: DataSet[Vector] = env.fromCollection(data)
   *val transformer = StandardScaler().setMean(10.0).setStd(2.0)
   *
-  *transformer.transform(trainingDS)
+  *transformer.fit(trainingDS)
+  *val transformedDS = transformer.transform(trainingDS)
   *  }}}
   *
   * =Parameters=
   *
-  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
-  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * - [[Mean]]: The mean value of transformed data set; by default equal 
to 0
+  * - [[Std]]: The standard deviation of the transformed data set; by 
default
   * equal to 1
--- End diff --

Why use just the top-level type here, but the fully qualified one in the 
ALS docstring?


 Add pipelining mechanism for chainable transformers and estimators
 --

 Key: FLINK-2050
 URL: https://issues.apache.org/jira/browse/FLINK-2050
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Till Rohrmann
  Labels: ML
 Fix For: 0.9


 The key concept of an easy to use ML library is the quick and simple 
 construction of data analysis pipelines. Scikit-learn's approach to define 
 transformers and estimators seems to be a really good solution to this 
 problem. I propose to follow a similar path, because it makes FlinkML 
 flexible in terms of code reuse as well as easy for people coming from 
 Scikit-learn to use the FlinkML.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2050] Introduces new pipelining mechani...

2015-05-21 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/704#discussion_r30783065
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -22,38 +22,47 @@ import breeze.linalg
 import breeze.numerics.sqrt
 import breeze.numerics.sqrt._
 import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
 import org.apache.flink.ml.math.Breeze._
-import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
+import org.apache.flink.ml.pipeline.{TransformOperation, FitOperation, 
Transformer}
 import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
 
+import scala.reflect.ClassTag
+
 /** Scales observations, so that all features have a user-specified mean 
and standard deviation.
   * By default for [[StandardScaler]] transformer mean=0.0 and std=1.0.
   *
-  * This transformer takes a [[Vector]] of values and maps it to a
-  * scaled [[Vector]] such that each feature has a user-specified mean and 
standard deviation.
+  * This transformer takes a subtype of  [[Vector]] of values and maps it 
to a
+  * scaled subtype of [[Vector]] such that each feature has a 
user-specified mean and standard
+  * deviation.
   *
   * This transformer can be prepended to all [[Transformer]] and
-  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
-  * [[Vector]].
+  * [[org.apache.flink.ml.pipeline.Predictor]] implementations which 
expect as input a subtype
+  * of [[Vector]].
   *
   * @example
   *  {{{
   *val trainingDS: DataSet[Vector] = env.fromCollection(data)
   *val transformer = StandardScaler().setMean(10.0).setStd(2.0)
   *
-  *transformer.transform(trainingDS)
+  *transformer.fit(trainingDS)
+  *val transformedDS = transformer.transform(trainingDS)
   *  }}}
   *
   * =Parameters=
   *
-  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
-  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * - [[Mean]]: The mean value of transformed data set; by default equal 
to 0
+  * - [[Std]]: The standard deviation of the transformed data set; by 
default
   * equal to 1
--- End diff --

Why use just the top-level type here, but the fully qualified one in the 
ALS docstring?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553894#comment-14553894
 ] 

ASF GitHub Bot commented on FLINK-2034:
---

Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30783024
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
--- End diff --

maybe we should add a headline that makes clear that this is a roadmap


 Add vision and roadmap for ML library to docs
 -

 Key: FLINK-2034
 URL: https://issues.apache.org/jira/browse/FLINK-2034
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Theodore Vasiloudis
Assignee: Theodore Vasiloudis
  Labels: ML
 Fix For: 0.9


 We should have a document describing the vision of the Machine Learning 
 library in Flink and an up to date roadmap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...

2015-05-21 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30783024
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
--- End diff --

maybe we should add a headline that makes clear that this is a roadmap


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r3078
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
--- End diff --

the `apply` method unpacks the option value, whereas the `get` method 
returns an `Option`. Maybe we should rename it into `getOption`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553817#comment-14553817
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r3078
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
--- End diff --

the `apply` method unpacks the option value, whereas the `get` method 
returns an `Option`. Maybe we should rename it into `getOption`.


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30780043
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
--- End diff --

One line comments should use `//`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: From stream to h base

2015-05-21 Thread fhueske
Github user fhueske commented on the pull request:

https://github.com/apache/flink/pull/702#issuecomment-104171609
  
How about adding a `writeToFile()` method that accepts a `FileInputFormat`?
This would only break code where the method is used with a 
non-FileInputFormat.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30780392
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
+def lossCalculation(data: DataSet[LabeledVector], weightDS: 
DataSet[WeightVector]):
+DataSet[Double] = {
+  data.map {
+new LossCalculation
+  }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
+.reduce {
+(left, right) =
+  val (leftLoss, leftCount) = left
+  val (rightLoss, rightCount) = right
+  (leftLoss + rightLoss, rightCount + leftCount)
+  }
+.map{new RegularizedLossCalculation}
+.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
 }
-  }
-  case None = createInitialWeightVector(dimensionsDS)
-}
-
-// Perform the iterations
-// TODO: Enable convergence stopping criterion, as in Multiple Linear 
regression
-initialWeightsDS.iterate(numberOfIterations) {
-  weightVector = {
-SGDStep(data, weightVector)
-  }
+// We have to calculate for each weight vector the sum of squared 
residuals,
+// and then sum them and apply regularization
+val initialLossSumDS = lossCalculation(data, initialWeightsDS)
+
+// Combine weight vector with the current loss
+val initialWeightsWithLossSum = initialWeightsDS.
+  crossWithTiny(initialLossSumDS).setParallelism(1)
+
+val resultWithLoss = initialWeightsWithLossSum.
+  iterateWithTermination(numberOfIterations) {
+  weightsWithLossSum =
+
+// Extract weight vector and loss
+val previousWeightsDS = weightsWithLossSum.map{_._1}
+val previousLossSumDS = weightsWithLossSum.map{_._2}
+
+val currentWeightsDS = SGDStep(data, previousWeightsDS)
+
+val currentLossSumDS = lossCalculation(data, currentWeightsDS)
+
+// Check if the relative change in the loss is smaller than the
+// convergence threshold. If yes, then terminate i.e. return 
empty termination data set
+val termination = 
previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1).
+  filter{
+  pair = {
+val (previousLoss, currentLoss) = pair
+
+if (previousLoss = 0) {
+  false
+} 

[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30780559
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
+def lossCalculation(data: DataSet[LabeledVector], weightDS: 
DataSet[WeightVector]):
+DataSet[Double] = {
+  data.map {
+new LossCalculation
+  }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
+.reduce {
+(left, right) =
+  val (leftLoss, leftCount) = left
+  val (rightLoss, rightCount) = right
+  (leftLoss + rightLoss, rightCount + leftCount)
+  }
+.map{new RegularizedLossCalculation}
+.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
 }
-  }
-  case None = createInitialWeightVector(dimensionsDS)
-}
-
-// Perform the iterations
-// TODO: Enable convergence stopping criterion, as in Multiple Linear 
regression
-initialWeightsDS.iterate(numberOfIterations) {
-  weightVector = {
-SGDStep(data, weightVector)
-  }
+// We have to calculate for each weight vector the sum of squared 
residuals,
+// and then sum them and apply regularization
+val initialLossSumDS = lossCalculation(data, initialWeightsDS)
+
+// Combine weight vector with the current loss
+val initialWeightsWithLossSum = initialWeightsDS.
+  crossWithTiny(initialLossSumDS).setParallelism(1)
+
+val resultWithLoss = initialWeightsWithLossSum.
+  iterateWithTermination(numberOfIterations) {
+  weightsWithLossSum =
+
+// Extract weight vector and loss
+val previousWeightsDS = weightsWithLossSum.map{_._1}
+val previousLossSumDS = weightsWithLossSum.map{_._2}
+
+val currentWeightsDS = SGDStep(data, previousWeightsDS)
+
+val currentLossSumDS = lossCalculation(data, currentWeightsDS)
+
+// Check if the relative change in the loss is smaller than the
+// convergence threshold. If yes, then terminate i.e. return 
empty termination data set
+val termination = 
previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1).
+  filter{
+  pair = {
+val (previousLoss, currentLoss) = pair
+
+if (previousLoss = 0) {
+  false
+} 

[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553835#comment-14553835
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30780614
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
+def lossCalculation(data: DataSet[LabeledVector], weightDS: 
DataSet[WeightVector]):
+DataSet[Double] = {
+  data.map {
+new LossCalculation
+  }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
+.reduce {
+(left, right) =
+  val (leftLoss, leftCount) = left
+  val (rightLoss, rightCount) = right
+  (leftLoss + rightLoss, rightCount + leftCount)
+  }
+.map{new RegularizedLossCalculation}
+.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
 }
-  }
-  case None = createInitialWeightVector(dimensionsDS)
-}
-
-// Perform the iterations
-// TODO: Enable convergence stopping criterion, as in Multiple Linear 
regression
-initialWeightsDS.iterate(numberOfIterations) {
-  weightVector = {
-SGDStep(data, weightVector)
-  }
+// We have to calculate for each weight vector the sum of squared 
residuals,
+// and then sum them and apply regularization
+val initialLossSumDS = lossCalculation(data, initialWeightsDS)
+
+// Combine weight vector with the current loss
+val initialWeightsWithLossSum = initialWeightsDS.
+  crossWithTiny(initialLossSumDS).setParallelism(1)
+
+val resultWithLoss = initialWeightsWithLossSum.
+  iterateWithTermination(numberOfIterations) {
+  weightsWithLossSum =
+
+// Extract weight vector and loss
+val previousWeightsDS = weightsWithLossSum.map{_._1}
+val previousLossSumDS = weightsWithLossSum.map{_._2}
+
+val currentWeightsDS = SGDStep(data, previousWeightsDS)
+
+val currentLossSumDS = lossCalculation(data, currentWeightsDS)
+
+// Check if the relative change in the loss is smaller than the
+// convergence threshold. If yes, then terminate i.e. return 
empty termination data set
+val termination = 

[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30780625
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
+def lossCalculation(data: DataSet[LabeledVector], weightDS: 
DataSet[WeightVector]):
+DataSet[Double] = {
+  data.map {
+new LossCalculation
+  }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
+.reduce {
+(left, right) =
+  val (leftLoss, leftCount) = left
+  val (rightLoss, rightCount) = right
+  (leftLoss + rightLoss, rightCount + leftCount)
+  }
+.map{new RegularizedLossCalculation}
+.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
 }
-  }
-  case None = createInitialWeightVector(dimensionsDS)
-}
-
-// Perform the iterations
-// TODO: Enable convergence stopping criterion, as in Multiple Linear 
regression
-initialWeightsDS.iterate(numberOfIterations) {
-  weightVector = {
-SGDStep(data, weightVector)
-  }
+// We have to calculate for each weight vector the sum of squared 
residuals,
+// and then sum them and apply regularization
+val initialLossSumDS = lossCalculation(data, initialWeightsDS)
+
+// Combine weight vector with the current loss
+val initialWeightsWithLossSum = initialWeightsDS.
+  crossWithTiny(initialLossSumDS).setParallelism(1)
+
+val resultWithLoss = initialWeightsWithLossSum.
+  iterateWithTermination(numberOfIterations) {
+  weightsWithLossSum =
+
+// Extract weight vector and loss
+val previousWeightsDS = weightsWithLossSum.map{_._1}
+val previousLossSumDS = weightsWithLossSum.map{_._2}
+
+val currentWeightsDS = SGDStep(data, previousWeightsDS)
+
+val currentLossSumDS = lossCalculation(data, currentWeightsDS)
+
+// Check if the relative change in the loss is smaller than the
+// convergence threshold. If yes, then terminate i.e. return 
empty termination data set
+val termination = 
previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1).
+  filter{
+  pair = {
+val (previousLoss, currentLoss) = pair
+
+if (previousLoss = 0) {
+  false
+} 

[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30780614
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
+def lossCalculation(data: DataSet[LabeledVector], weightDS: 
DataSet[WeightVector]):
+DataSet[Double] = {
+  data.map {
+new LossCalculation
+  }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
+.reduce {
+(left, right) =
+  val (leftLoss, leftCount) = left
+  val (rightLoss, rightCount) = right
+  (leftLoss + rightLoss, rightCount + leftCount)
+  }
+.map{new RegularizedLossCalculation}
+.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
 }
-  }
-  case None = createInitialWeightVector(dimensionsDS)
-}
-
-// Perform the iterations
-// TODO: Enable convergence stopping criterion, as in Multiple Linear 
regression
-initialWeightsDS.iterate(numberOfIterations) {
-  weightVector = {
-SGDStep(data, weightVector)
-  }
+// We have to calculate for each weight vector the sum of squared 
residuals,
+// and then sum them and apply regularization
+val initialLossSumDS = lossCalculation(data, initialWeightsDS)
+
+// Combine weight vector with the current loss
+val initialWeightsWithLossSum = initialWeightsDS.
+  crossWithTiny(initialLossSumDS).setParallelism(1)
+
+val resultWithLoss = initialWeightsWithLossSum.
+  iterateWithTermination(numberOfIterations) {
+  weightsWithLossSum =
+
+// Extract weight vector and loss
+val previousWeightsDS = weightsWithLossSum.map{_._1}
+val previousLossSumDS = weightsWithLossSum.map{_._2}
+
+val currentWeightsDS = SGDStep(data, previousWeightsDS)
+
+val currentLossSumDS = lossCalculation(data, currentWeightsDS)
+
+// Check if the relative change in the loss is smaller than the
+// convergence threshold. If yes, then terminate i.e. return 
empty termination data set
+val termination = 
previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1).
+  filter{
+  pair = {
+val (previousLoss, currentLoss) = pair
+
+if (previousLoss = 0) {
+  false
+} 

[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553834#comment-14553834
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30780559
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
+def lossCalculation(data: DataSet[LabeledVector], weightDS: 
DataSet[WeightVector]):
+DataSet[Double] = {
+  data.map {
+new LossCalculation
+  }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
+.reduce {
+(left, right) =
+  val (leftLoss, leftCount) = left
+  val (rightLoss, rightCount) = right
+  (leftLoss + rightLoss, rightCount + leftCount)
+  }
+.map{new RegularizedLossCalculation}
+.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
 }
-  }
-  case None = createInitialWeightVector(dimensionsDS)
-}
-
-// Perform the iterations
-// TODO: Enable convergence stopping criterion, as in Multiple Linear 
regression
-initialWeightsDS.iterate(numberOfIterations) {
-  weightVector = {
-SGDStep(data, weightVector)
-  }
+// We have to calculate for each weight vector the sum of squared 
residuals,
+// and then sum them and apply regularization
+val initialLossSumDS = lossCalculation(data, initialWeightsDS)
+
+// Combine weight vector with the current loss
+val initialWeightsWithLossSum = initialWeightsDS.
+  crossWithTiny(initialLossSumDS).setParallelism(1)
+
+val resultWithLoss = initialWeightsWithLossSum.
+  iterateWithTermination(numberOfIterations) {
+  weightsWithLossSum =
+
+// Extract weight vector and loss
+val previousWeightsDS = weightsWithLossSum.map{_._1}
+val previousLossSumDS = weightsWithLossSum.map{_._2}
+
+val currentWeightsDS = SGDStep(data, previousWeightsDS)
+
+val currentLossSumDS = lossCalculation(data, currentWeightsDS)
+
+// Check if the relative change in the loss is smaller than the
+// convergence threshold. If yes, then terminate i.e. return 
empty termination data set
+val termination = 

[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30781162
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
 ---
@@ -106,7 +139,7 @@ object Solver {
   * See [[https://en.wikipedia.org/wiki/Iterative_method Iterative Methods 
on Wikipedia]] for more
   * info
   */
-abstract class IterativeSolver extends Solver {
+abstract class IterativeSolver() extends Solver() {
--- End diff --

Do we need the empty parenthesis?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30781214
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
 ---
@@ -118,6 +151,44 @@ abstract class IterativeSolver extends Solver {
 parameters.add(Stepsize, stepsize)
 this
   }
+
+  def setConvergenceThreshold(convergenceThreshold: Double): 
IterativeSolver = {
+parameters.add(ConvergenceThreshold, convergenceThreshold)
+this
+  }
+
+  /** Mapping function that calculates the weight gradients from the data.
+*
+*/
+  protected class GradientCalculation extends
--- End diff --

extends in next line with 4 spaces indentation if line break is necessary 
here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553854#comment-14553854
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30781162
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
 ---
@@ -106,7 +139,7 @@ object Solver {
   * See [[https://en.wikipedia.org/wiki/Iterative_method Iterative Methods 
on Wikipedia]] for more
   * info
   */
-abstract class IterativeSolver extends Solver {
+abstract class IterativeSolver() extends Solver() {
--- End diff --

Do we need the empty parenthesis?


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Some fixes for Scala type analysis

2015-05-21 Thread vasia
Github user vasia commented on the pull request:

https://github.com/apache/flink/pull/669#issuecomment-104174485
  
Hey @aljoscha,
it seems this is one needed for the Gelly Scala API :-)
Shall I go ahead and merge it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-1985] Streaming does not correctly forw...

2015-05-21 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/682#discussion_r30782073
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 ---
@@ -93,6 +96,15 @@ public JobGraph createJobGraph(String jobName) {

configureCheckpointing();
 
+   try {
+   InstantiationUtil.writeObjectToConfig(
+   this.streamGraph.getExecutionConfig(),
+   this.jobGraph.getJobConfiguration(),
+   ExecutionConfig.CONFIG_KEY);
+   } catch (IOException e) {
+   throw new RuntimeException(Config object could not be 
written to Job Configuration:  + e);
--- End diff --

The indentation is off here ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-1985] Streaming does not correctly forw...

2015-05-21 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/682#issuecomment-104180508
  
I have some minor remarks which need some attention, otherwise good to 
merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Some fixes for Scala type analysis

2015-05-21 Thread aljoscha
Github user aljoscha commented on the pull request:

https://github.com/apache/flink/pull/669#issuecomment-104182293
  
I didn't want to merge it without any comments. But please, go ahead. 
:smile: 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553811#comment-14553811
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30779835
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._
   * At the moment, the whole partition is used for SGD, making it 
effectively a batch gradient
   * descent. Once a sampling operator has been introduced, the algorithm 
can be optimized
   *
-  * @param runParameters The parameters to tune the algorithm. Currently 
these include:
-  *  [[Solver.LossFunction]] for the loss function to 
be used,
-  *  [[Solver.RegularizationType]] for the type of 
regularization,
-  *  [[Solver.RegularizationParameter]] for the 
regularization parameter,
+  *  The parameters to tune the algorithm are:
+  *  [[Solver.LossFunctionParameter]] for the loss 
function to be used,
+  *  [[Solver.RegularizationTypeParameter]] for the 
type of regularization,
+  *  [[Solver.RegularizationValueParameter]] for the 
regularization parameter,
   *  [[IterativeSolver.Iterations]] for the maximum 
number of iteration,
   *  [[IterativeSolver.Stepsize]] for the learning 
rate used.
+  *  [[IterativeSolver.ConvergenceThreshold]] when 
provided the algorithm will
+  *  stop the iterations if the change in the value of 
the objective
+  *  function between successive iterations is is 
smaller than this value.
   */
-class GradientDescent(runParameters: ParameterMap) extends IterativeSolver 
{
+class GradientDescent() extends IterativeSolver() {
--- End diff --

Do we need the parenthesis?


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553808#comment-14553808
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30779810
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._
   * At the moment, the whole partition is used for SGD, making it 
effectively a batch gradient
   * descent. Once a sampling operator has been introduced, the algorithm 
can be optimized
   *
-  * @param runParameters The parameters to tune the algorithm. Currently 
these include:
-  *  [[Solver.LossFunction]] for the loss function to 
be used,
-  *  [[Solver.RegularizationType]] for the type of 
regularization,
-  *  [[Solver.RegularizationParameter]] for the 
regularization parameter,
+  *  The parameters to tune the algorithm are:
+  *  [[Solver.LossFunctionParameter]] for the loss 
function to be used,
+  *  [[Solver.RegularizationTypeParameter]] for the 
type of regularization,
+  *  [[Solver.RegularizationValueParameter]] for the 
regularization parameter,
--- End diff --

Do we want to append the Parameter suffix to the parameter names? So far 
we haven't done that.


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: From stream to h base

2015-05-21 Thread HilmiYildirim
Github user HilmiYildirim commented on the pull request:

https://github.com/apache/flink/pull/702#issuecomment-104171850
  
Good idea. I am working on it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: From stream to h base

2015-05-21 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/702#issuecomment-104171814
  
That is a very nice idea!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553832#comment-14553832
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30780392
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
+def lossCalculation(data: DataSet[LabeledVector], weightDS: 
DataSet[WeightVector]):
+DataSet[Double] = {
+  data.map {
+new LossCalculation
+  }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
+.reduce {
+(left, right) =
+  val (leftLoss, leftCount) = left
+  val (rightLoss, rightCount) = right
+  (leftLoss + rightLoss, rightCount + leftCount)
+  }
+.map{new RegularizedLossCalculation}
+.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
 }
-  }
-  case None = createInitialWeightVector(dimensionsDS)
-}
-
-// Perform the iterations
-// TODO: Enable convergence stopping criterion, as in Multiple Linear 
regression
-initialWeightsDS.iterate(numberOfIterations) {
-  weightVector = {
-SGDStep(data, weightVector)
-  }
+// We have to calculate for each weight vector the sum of squared 
residuals,
+// and then sum them and apply regularization
+val initialLossSumDS = lossCalculation(data, initialWeightsDS)
+
+// Combine weight vector with the current loss
+val initialWeightsWithLossSum = initialWeightsDS.
+  crossWithTiny(initialLossSumDS).setParallelism(1)
+
+val resultWithLoss = initialWeightsWithLossSum.
+  iterateWithTermination(numberOfIterations) {
+  weightsWithLossSum =
+
+// Extract weight vector and loss
+val previousWeightsDS = weightsWithLossSum.map{_._1}
+val previousLossSumDS = weightsWithLossSum.map{_._2}
+
+val currentWeightsDS = SGDStep(data, previousWeightsDS)
+
+val currentLossSumDS = lossCalculation(data, currentWeightsDS)
+
+// Check if the relative change in the loss is smaller than the
+// convergence threshold. If yes, then terminate i.e. return 
empty termination data set
+val termination = 

[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30781085
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala
 ---
@@ -95,6 +95,7 @@ abstract class DiffRegularization extends Regularization {
   regParameter: Double)
 }
 
+// TODO(tvas): I think NoRegularization should extend DiffRegularization
--- End diff --

Why do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553855#comment-14553855
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30781214
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
 ---
@@ -118,6 +151,44 @@ abstract class IterativeSolver extends Solver {
 parameters.add(Stepsize, stepsize)
 this
   }
+
+  def setConvergenceThreshold(convergenceThreshold: Double): 
IterativeSolver = {
+parameters.add(ConvergenceThreshold, convergenceThreshold)
+this
+  }
+
+  /** Mapping function that calculates the weight gradients from the data.
+*
+*/
+  protected class GradientCalculation extends
--- End diff --

extends in next line with 4 spaces indentation if line break is necessary 
here.


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2050) Add pipelining mechanism for chainable transformers and estimators

2015-05-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553879#comment-14553879
 ] 

Till Rohrmann commented on FLINK-2050:
--

Yes of course, where do I find it?

On Thu, May 21, 2015 at 10:11 AM, Theodore Vasiloudis (JIRA) 



 Add pipelining mechanism for chainable transformers and estimators
 --

 Key: FLINK-2050
 URL: https://issues.apache.org/jira/browse/FLINK-2050
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Till Rohrmann
  Labels: ML
 Fix For: 0.9


 The key concept of an easy to use ML library is the quick and simple 
 construction of data analysis pipelines. Scikit-learn's approach to define 
 transformers and estimators seems to be a really good solution to this 
 problem. I propose to follow a similar path, because it makes FlinkML 
 flexible in terms of code reuse as well as easy for people coming from 
 Scikit-learn to use the FlinkML.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...

2015-05-21 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/688#issuecomment-104185370
  
Thank you for writing this. Finally I get a better understanding of the 
overall status of the FlinkML stuff ;)

+1 to merge.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553907#comment-14553907
 ] 

ASF GitHub Bot commented on FLINK-2034:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/688#issuecomment-104185370
  
Thank you for writing this. Finally I get a better understanding of the 
overall status of the FlinkML stuff ;)

+1 to merge.


 Add vision and roadmap for ML library to docs
 -

 Key: FLINK-2034
 URL: https://issues.apache.org/jira/browse/FLINK-2034
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Theodore Vasiloudis
Assignee: Theodore Vasiloudis
  Labels: ML
 Fix For: 0.9


 We should have a document describing the vision of the Machine Learning 
 library in Flink and an up to date roadmap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2050) Add pipelining mechanism for chainable transformers and estimators

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553909#comment-14553909
 ] 

ASF GitHub Bot commented on FLINK-2050:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/704#discussion_r30783847
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -22,38 +22,47 @@ import breeze.linalg
 import breeze.numerics.sqrt
 import breeze.numerics.sqrt._
 import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
 import org.apache.flink.ml.math.Breeze._
-import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
+import org.apache.flink.ml.pipeline.{TransformOperation, FitOperation, 
Transformer}
 import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
 
+import scala.reflect.ClassTag
+
 /** Scales observations, so that all features have a user-specified mean 
and standard deviation.
   * By default for [[StandardScaler]] transformer mean=0.0 and std=1.0.
   *
-  * This transformer takes a [[Vector]] of values and maps it to a
-  * scaled [[Vector]] such that each feature has a user-specified mean and 
standard deviation.
+  * This transformer takes a subtype of  [[Vector]] of values and maps it 
to a
+  * scaled subtype of [[Vector]] such that each feature has a 
user-specified mean and standard
+  * deviation.
   *
   * This transformer can be prepended to all [[Transformer]] and
-  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
-  * [[Vector]].
+  * [[org.apache.flink.ml.pipeline.Predictor]] implementations which 
expect as input a subtype
+  * of [[Vector]].
   *
   * @example
   *  {{{
   *val trainingDS: DataSet[Vector] = env.fromCollection(data)
   *val transformer = StandardScaler().setMean(10.0).setStd(2.0)
   *
-  *transformer.transform(trainingDS)
+  *transformer.fit(trainingDS)
+  *val transformedDS = transformer.transform(trainingDS)
   *  }}}
   *
   * =Parameters=
   *
-  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
-  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * - [[Mean]]: The mean value of transformed data set; by default equal 
to 0
+  * - [[Std]]: The standard deviation of the transformed data set; by 
default
   * equal to 1
--- End diff --

IntelliJ did it that way. I haven't figured out yet whether the full path 
is necessary or not.


 Add pipelining mechanism for chainable transformers and estimators
 --

 Key: FLINK-2050
 URL: https://issues.apache.org/jira/browse/FLINK-2050
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Till Rohrmann
  Labels: ML
 Fix For: 0.9


 The key concept of an easy to use ML library is the quick and simple 
 construction of data analysis pipelines. Scikit-learn's approach to define 
 transformers and estimators seems to be a really good solution to this 
 problem. I propose to follow a similar path, because it makes FlinkML 
 flexible in terms of code reuse as well as easy for people coming from 
 Scikit-learn to use the FlinkML.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30783908
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
--- End diff --

I'm following the [Scala style 
guide](http://docs.scala-lang.org/style/scaladoc.html#general_style) here, 
should I change this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553911#comment-14553911
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30783908
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
--- End diff --

I'm following the [Scala style 
guide](http://docs.scala-lang.org/style/scaladoc.html#general_style) here, 
should I change this?


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2050] Introduces new pipelining mechani...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/704#discussion_r30783847
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/preprocessing/StandardScaler.scala
 ---
@@ -22,38 +22,47 @@ import breeze.linalg
 import breeze.numerics.sqrt
 import breeze.numerics.sqrt._
 import org.apache.flink.api.common.functions._
+import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.scala._
 import org.apache.flink.configuration.Configuration
-import org.apache.flink.ml.common.{Parameter, ParameterMap, Transformer}
+import org.apache.flink.ml.common.{LabeledVector, Parameter, ParameterMap}
 import org.apache.flink.ml.math.Breeze._
-import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.math.{BreezeVectorConverter, Vector}
+import org.apache.flink.ml.pipeline.{TransformOperation, FitOperation, 
Transformer}
 import org.apache.flink.ml.preprocessing.StandardScaler.{Mean, Std}
 
+import scala.reflect.ClassTag
+
 /** Scales observations, so that all features have a user-specified mean 
and standard deviation.
   * By default for [[StandardScaler]] transformer mean=0.0 and std=1.0.
   *
-  * This transformer takes a [[Vector]] of values and maps it to a
-  * scaled [[Vector]] such that each feature has a user-specified mean and 
standard deviation.
+  * This transformer takes a subtype of  [[Vector]] of values and maps it 
to a
+  * scaled subtype of [[Vector]] such that each feature has a 
user-specified mean and standard
+  * deviation.
   *
   * This transformer can be prepended to all [[Transformer]] and
-  * [[org.apache.flink.ml.common.Learner]] implementations which expect an 
input of
-  * [[Vector]].
+  * [[org.apache.flink.ml.pipeline.Predictor]] implementations which 
expect as input a subtype
+  * of [[Vector]].
   *
   * @example
   *  {{{
   *val trainingDS: DataSet[Vector] = env.fromCollection(data)
   *val transformer = StandardScaler().setMean(10.0).setStd(2.0)
   *
-  *transformer.transform(trainingDS)
+  *transformer.fit(trainingDS)
+  *val transformedDS = transformer.transform(trainingDS)
   *  }}}
   *
   * =Parameters=
   *
-  * - [[StandardScaler.Mean]]: The mean value of transformed data set; by 
default equal to 0
-  * - [[StandardScaler.Std]]: The standard deviation of the transformed 
data set; by default
+  * - [[Mean]]: The mean value of transformed data set; by default equal 
to 0
+  * - [[Std]]: The standard deviation of the transformed data set; by 
default
   * equal to 1
--- End diff --

IntelliJ did it that way. I haven't figured out yet whether the full path 
is necessary or not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2050] Introduces new pipelining mechani...

2015-05-21 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/704#issuecomment-104186229
  
Documentation :white_check_mark: 
Scaladocs :white_check_mark: 
Tests :white_check_mark: 
No code outside of flink-ml touched :white_check_mark: 
:ship: it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30784142
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._
   * At the moment, the whole partition is used for SGD, making it 
effectively a batch gradient
   * descent. Once a sampling operator has been introduced, the algorithm 
can be optimized
   *
-  * @param runParameters The parameters to tune the algorithm. Currently 
these include:
-  *  [[Solver.LossFunction]] for the loss function to 
be used,
-  *  [[Solver.RegularizationType]] for the type of 
regularization,
-  *  [[Solver.RegularizationParameter]] for the 
regularization parameter,
+  *  The parameters to tune the algorithm are:
+  *  [[Solver.LossFunctionParameter]] for the loss 
function to be used,
+  *  [[Solver.RegularizationTypeParameter]] for the 
type of regularization,
+  *  [[Solver.RegularizationValueParameter]] for the 
regularization parameter,
--- End diff --

I'm doing this to avoid confusion between the LossFunction class and the 
parameter setting here. Having the same name I think creates some confusion, 
especially when auto-completing stuff in Intellij. However the names are 
starting to get pretty long.

You think we should remove this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30784369
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
+
+Contrary to other data-processing systems, Flink exploits in-memory data 
streaming, and natively
+executes iterative processing algorithms which are common in ML. We plan 
to exploit the streaming
+nature of Flink, and provide functionality designed specifically for data 
streams.
+
+FlinkML will allow data scientists to test their models locally and using 
subsets of data, and then
+use the same code to run their algorithms at a much larger scale in a 
cluster setting.
+
+We are inspired by other open source efforts to provide ML systems, in 
particular
+[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML 
pipelines, and Spark’s
+[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that 
scale with problem and
+cluster sizes.
--- End diff --

Do you think, we need this paragraph here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553945#comment-14553945
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30784817
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
+def lossCalculation(data: DataSet[LabeledVector], weightDS: 
DataSet[WeightVector]):
+DataSet[Double] = {
+  data.map {
+new LossCalculation
+  }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
+.reduce {
+(left, right) =
+  val (leftLoss, leftCount) = left
+  val (rightLoss, rightCount) = right
+  (leftLoss + rightLoss, rightCount + leftCount)
+  }
+.map{new RegularizedLossCalculation}
+.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
 }
-  }
-  case None = createInitialWeightVector(dimensionsDS)
-}
-
-// Perform the iterations
-// TODO: Enable convergence stopping criterion, as in Multiple Linear 
regression
-initialWeightsDS.iterate(numberOfIterations) {
-  weightVector = {
-SGDStep(data, weightVector)
-  }
+// We have to calculate for each weight vector the sum of squared 
residuals,
+// and then sum them and apply regularization
+val initialLossSumDS = lossCalculation(data, initialWeightsDS)
+
+// Combine weight vector with the current loss
+val initialWeightsWithLossSum = initialWeightsDS.
+  crossWithTiny(initialLossSumDS).setParallelism(1)
+
+val resultWithLoss = initialWeightsWithLossSum.
+  iterateWithTermination(numberOfIterations) {
+  weightsWithLossSum =
+
+// Extract weight vector and loss
+val previousWeightsDS = weightsWithLossSum.map{_._1}
+val previousLossSumDS = weightsWithLossSum.map{_._2}
+
+val currentWeightsDS = SGDStep(data, previousWeightsDS)
+
+val currentLossSumDS = lossCalculation(data, currentWeightsDS)
+
+// Check if the relative change in the loss is smaller than the
+// convergence threshold. If yes, then terminate i.e. return 
empty termination data set
+val termination = 

[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553951#comment-14553951
 ] 

ASF GitHub Bot commented on FLINK-2034:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30785023
  
--- Diff: docs/libs/ml/index.md ---
@@ -30,7 +122,7 @@ under the License.
 /dependency
 {% endhighlight %}
 
-## Algorithms
+## Algorithm Documentation
--- End diff --

I think, we should move the algorithms a little bit more prominently to the 
top of the page.


 Add vision and roadmap for ML library to docs
 -

 Key: FLINK-2034
 URL: https://issues.apache.org/jira/browse/FLINK-2034
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Theodore Vasiloudis
Assignee: Theodore Vasiloudis
  Labels: ML
 Fix For: 0.9


 We should have a document describing the vision of the Machine Learning 
 library in Flink and an up to date roadmap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/688#issuecomment-104193448
  
Really nice text @thvasilo. I think it's a great introduction.

But I also think that we should change a little bit the outline of our 
starting page. IMO, it should be governed by what a user expects from such a 
starting site. If I were a new user, I would expect something along the lines 
of:

1. Short introduction (your text is good if we condense the last paragraphs)
2. List of supported algorithms
3. Getting started (what dependencies to include etc.)
4. Tutorial/Example use case
5. Roadmap

I'm not so sure whether the list of algorithms should be number 2 but 
that's something to discuss. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553959#comment-14553959
 ] 

ASF GitHub Bot commented on FLINK-2034:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/688#issuecomment-104193448
  
Really nice text @thvasilo. I think it's a great introduction.

But I also think that we should change a little bit the outline of our 
starting page. IMO, it should be governed by what a user expects from such a 
starting site. If I were a new user, I would expect something along the lines 
of:

1. Short introduction (your text is good if we condense the last paragraphs)
2. List of supported algorithms
3. Getting started (what dependencies to include etc.)
4. Tutorial/Example use case
5. Roadmap

I'm not so sure whether the list of algorithms should be number 2 but 
that's something to discuss. What do you think?


 Add vision and roadmap for ML library to docs
 -

 Key: FLINK-2034
 URL: https://issues.apache.org/jira/browse/FLINK-2034
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Theodore Vasiloudis
Assignee: Theodore Vasiloudis
  Labels: ML
 Fix For: 0.9


 We should have a document describing the vision of the Machine Learning 
 library in Flink and an up to date roadmap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30785863
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
 ---
@@ -40,6 +44,32 @@ abstract class Solver extends Serializable with 
WithParameters {
 data: DataSet[LabeledVector],
 initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector]
 
+  /** Creates initial weights vector, creating a DataSet with a 
WeightVector element
+*
+* @param initialWeights An Option that may contain an initial set of 
weights
+* @param data The data for which we optimize the weights
+* @return A DataSet containing a single WeightVector element
+*/
+  def createInitialWeightsDS(initialWeights: Option[DataSet[WeightVector]],
+ data: DataSet[LabeledVector]):  
DataSet[WeightVector] = {
+// TODO: Faster way to do this?
+val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
+
+initialWeights match {
+  // Ensure provided weight vector is a DenseVector
+  case Some(wvDS) =
+wvDS.map { wv = {
+  val denseWeights = wv.weights match {
+case dv: DenseVector = dv
+case sv: SparseVector = sv.toDenseVector
+  }
+  WeightVector(denseWeights, wv.intercept)
--- End diff --

Not sure what you mean here, is it `WeightVector(denseWeights, 
wv.intercept)` that is off?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553963#comment-14553963
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30785863
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
 ---
@@ -40,6 +44,32 @@ abstract class Solver extends Serializable with 
WithParameters {
 data: DataSet[LabeledVector],
 initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector]
 
+  /** Creates initial weights vector, creating a DataSet with a 
WeightVector element
+*
+* @param initialWeights An Option that may contain an initial set of 
weights
+* @param data The data for which we optimize the weights
+* @return A DataSet containing a single WeightVector element
+*/
+  def createInitialWeightsDS(initialWeights: Option[DataSet[WeightVector]],
+ data: DataSet[LabeledVector]):  
DataSet[WeightVector] = {
+// TODO: Faster way to do this?
+val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
+
+initialWeights match {
+  // Ensure provided weight vector is a DenseVector
+  case Some(wvDS) =
+wvDS.map { wv = {
+  val denseWeights = wv.weights match {
+case dv: DenseVector = dv
+case sv: SparseVector = sv.toDenseVector
+  }
+  WeightVector(denseWeights, wv.intercept)
--- End diff --

Not sure what you mean here, is it `WeightVector(denseWeights, 
wv.intercept)` that is off?


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30786437
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
--- End diff --

That is ScalaDoc what you've written. Usually this is only used for 
classes/objects/packages/methods but not method body comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553976#comment-14553976
 ] 

ASF GitHub Bot commented on FLINK-2034:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30786608
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
+
+Contrary to other data-processing systems, Flink exploits in-memory data 
streaming, and natively
+executes iterative processing algorithms which are common in ML. We plan 
to exploit the streaming
+nature of Flink, and provide functionality designed specifically for data 
streams.
+
+FlinkML will allow data scientists to test their models locally and using 
subsets of data, and then
+use the same code to run their algorithms at a much larger scale in a 
cluster setting.
+
+We are inspired by other open source efforts to provide ML systems, in 
particular
+[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML 
pipelines, and Spark’s
+[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that 
scale with problem and
+cluster sizes.
+
+We already have some of the building blocks for FlinkML in place, and will 
continue to extend the
+library with more algorithms. An example of how simple it is to create a 
learning model in
+FlinkML is given below:
--- End diff --

A separate example section is a good idea. Still, I would like to keep this 
very small example here, to make it clear that getting up and running with the 
library is just a few lines of code.


 Add vision and roadmap for ML library to docs
 -

 Key: FLINK-2034
 URL: https://issues.apache.org/jira/browse/FLINK-2034
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Theodore Vasiloudis
Assignee: Theodore Vasiloudis
  Labels: ML
 Fix For: 0.9


 We should have a document describing the vision of the Machine Learning 
 library in Flink and an up to date roadmap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553975#comment-14553975
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30786580
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
 ---
@@ -40,6 +44,32 @@ abstract class Solver extends Serializable with 
WithParameters {
 data: DataSet[LabeledVector],
 initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector]
 
+  /** Creates initial weights vector, creating a DataSet with a 
WeightVector element
+*
+* @param initialWeights An Option that may contain an initial set of 
weights
+* @param data The data for which we optimize the weights
+* @return A DataSet containing a single WeightVector element
+*/
+  def createInitialWeightsDS(initialWeights: Option[DataSet[WeightVector]],
+ data: DataSet[LabeledVector]):  
DataSet[WeightVector] = {
+// TODO: Faster way to do this?
+val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
+
+initialWeights match {
+  // Ensure provided weight vector is a DenseVector
+  case Some(wvDS) =
+wvDS.map { wv = {
+  val denseWeights = wv.weights match {
+case dv: DenseVector = dv
+case sv: SparseVector = sv.toDenseVector
+  }
+  WeightVector(denseWeights, wv.intercept)
--- End diff --

if you put `wv = {` in a new line, and indent the block of it, then it's 
fine.


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...

2015-05-21 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30786608
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
+
+Contrary to other data-processing systems, Flink exploits in-memory data 
streaming, and natively
+executes iterative processing algorithms which are common in ML. We plan 
to exploit the streaming
+nature of Flink, and provide functionality designed specifically for data 
streams.
+
+FlinkML will allow data scientists to test their models locally and using 
subsets of data, and then
+use the same code to run their algorithms at a much larger scale in a 
cluster setting.
+
+We are inspired by other open source efforts to provide ML systems, in 
particular
+[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML 
pipelines, and Spark’s
+[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that 
scale with problem and
+cluster sizes.
+
+We already have some of the building blocks for FlinkML in place, and will 
continue to extend the
+library with more algorithms. An example of how simple it is to create a 
learning model in
+FlinkML is given below:
--- End diff --

A separate example section is a good idea. Still, I would like to keep this 
very small example here, to make it clear that getting up and running with the 
library is just a few lines of code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30786580
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Solver.scala
 ---
@@ -40,6 +44,32 @@ abstract class Solver extends Serializable with 
WithParameters {
 data: DataSet[LabeledVector],
 initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector]
 
+  /** Creates initial weights vector, creating a DataSet with a 
WeightVector element
+*
+* @param initialWeights An Option that may contain an initial set of 
weights
+* @param data The data for which we optimize the weights
+* @return A DataSet containing a single WeightVector element
+*/
+  def createInitialWeightsDS(initialWeights: Option[DataSet[WeightVector]],
+ data: DataSet[LabeledVector]):  
DataSet[WeightVector] = {
+// TODO: Faster way to do this?
+val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
+
+initialWeights match {
+  // Ensure provided weight vector is a DenseVector
+  case Some(wvDS) =
+wvDS.map { wv = {
+  val denseWeights = wv.weights match {
+case dv: DenseVector = dv
+case sv: SparseVector = sv.toDenseVector
+  }
+  WeightVector(denseWeights, wv.intercept)
--- End diff --

if you put `wv = {` in a new line, and indent the block of it, then it's 
fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553915#comment-14553915
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30783963
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
+def lossCalculation(data: DataSet[LabeledVector], weightDS: 
DataSet[WeightVector]):
+DataSet[Double] = {
+  data.map {
+new LossCalculation
+  }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
+.reduce {
+(left, right) =
+  val (leftLoss, leftCount) = left
+  val (rightLoss, rightCount) = right
+  (leftLoss + rightLoss, rightCount + leftCount)
+  }
+.map{new RegularizedLossCalculation}
+.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
 }
-  }
-  case None = createInitialWeightVector(dimensionsDS)
-}
-
-// Perform the iterations
-// TODO: Enable convergence stopping criterion, as in Multiple Linear 
regression
-initialWeightsDS.iterate(numberOfIterations) {
-  weightVector = {
-SGDStep(data, weightVector)
-  }
+// We have to calculate for each weight vector the sum of squared 
residuals,
+// and then sum them and apply regularization
+val initialLossSumDS = lossCalculation(data, initialWeightsDS)
+
+// Combine weight vector with the current loss
+val initialWeightsWithLossSum = initialWeightsDS.
+  crossWithTiny(initialLossSumDS).setParallelism(1)
+
+val resultWithLoss = initialWeightsWithLossSum.
+  iterateWithTermination(numberOfIterations) {
+  weightsWithLossSum =
+
+// Extract weight vector and loss
+val previousWeightsDS = weightsWithLossSum.map{_._1}
+val previousLossSumDS = weightsWithLossSum.map{_._2}
+
+val currentWeightsDS = SGDStep(data, previousWeightsDS)
+
+val currentLossSumDS = lossCalculation(data, currentWeightsDS)
+
+// Check if the relative change in the loss is smaller than the
+// convergence threshold. If yes, then terminate i.e. return 
empty termination data set
+val termination = 

[jira] [Commented] (FLINK-2050) Add pipelining mechanism for chainable transformers and estimators

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2050?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553914#comment-14553914
 ] 

ASF GitHub Bot commented on FLINK-2050:
---

Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/704#issuecomment-104186229
  
Documentation :white_check_mark: 
Scaladocs :white_check_mark: 
Tests :white_check_mark: 
No code outside of flink-ml touched :white_check_mark: 
:ship: it


 Add pipelining mechanism for chainable transformers and estimators
 --

 Key: FLINK-2050
 URL: https://issues.apache.org/jira/browse/FLINK-2050
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Till Rohrmann
  Labels: ML
 Fix For: 0.9


 The key concept of an easy to use ML library is the quick and simple 
 construction of data analysis pipelines. Scikit-learn's approach to define 
 transformers and estimators seems to be a really good solution to this 
 problem. I propose to follow a similar path, because it makes FlinkML 
 flexible in terms of code reuse as well as easy for people coming from 
 Scikit-learn to use the FlinkML.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30784001
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
+def lossCalculation(data: DataSet[LabeledVector], weightDS: 
DataSet[WeightVector]):
+DataSet[Double] = {
+  data.map {
+new LossCalculation
+  }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
+.reduce {
+(left, right) =
+  val (leftLoss, leftCount) = left
+  val (rightLoss, rightCount) = right
+  (leftLoss + rightLoss, rightCount + leftCount)
+  }
+.map{new RegularizedLossCalculation}
+.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
 }
-  }
-  case None = createInitialWeightVector(dimensionsDS)
-}
-
-// Perform the iterations
-// TODO: Enable convergence stopping criterion, as in Multiple Linear 
regression
-initialWeightsDS.iterate(numberOfIterations) {
-  weightVector = {
-SGDStep(data, weightVector)
-  }
+// We have to calculate for each weight vector the sum of squared 
residuals,
+// and then sum them and apply regularization
+val initialLossSumDS = lossCalculation(data, initialWeightsDS)
+
+// Combine weight vector with the current loss
+val initialWeightsWithLossSum = initialWeightsDS.
+  crossWithTiny(initialLossSumDS).setParallelism(1)
+
+val resultWithLoss = initialWeightsWithLossSum.
+  iterateWithTermination(numberOfIterations) {
+  weightsWithLossSum =
+
+// Extract weight vector and loss
+val previousWeightsDS = weightsWithLossSum.map{_._1}
+val previousLossSumDS = weightsWithLossSum.map{_._2}
+
+val currentWeightsDS = SGDStep(data, previousWeightsDS)
+
+val currentLossSumDS = lossCalculation(data, currentWeightsDS)
+
+// Check if the relative change in the loss is smaller than the
+// convergence threshold. If yes, then terminate i.e. return 
empty termination data set
+val termination = 
previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1).
+  filter{
+  pair = {
+val (previousLoss, currentLoss) = pair
+
+if (previousLoss = 0) {
+  false
+} else 

[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30784160
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
+
+Contrary to other data-processing systems, Flink exploits in-memory data 
streaming, and natively
+executes iterative processing algorithms which are common in ML. We plan 
to exploit the streaming
+nature of Flink, and provide functionality designed specifically for data 
streams.
+
+FlinkML will allow data scientists to test their models locally and using 
subsets of data, and then
+use the same code to run their algorithms at a much larger scale in a 
cluster setting.
+
+We are inspired by other open source efforts to provide ML systems, in 
particular
+[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML 
pipelines, and Spark’s
+[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that 
scale with problem and
+cluster sizes.
+
+We already have some of the building blocks for FlinkML in place, and will 
continue to extend the
+library with more algorithms. An example of how simple it is to create a 
learning model in
+FlinkML is given below:
+
+{% highlight scala %}
+// LabelbedVector is a feature vector with a label (class or real value)
+val data: DataSet[LabelVector] = ...
--- End diff --

`LabeledVector`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30784150
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
+
+Contrary to other data-processing systems, Flink exploits in-memory data 
streaming, and natively
+executes iterative processing algorithms which are common in ML. We plan 
to exploit the streaming
+nature of Flink, and provide functionality designed specifically for data 
streams.
+
+FlinkML will allow data scientists to test their models locally and using 
subsets of data, and then
+use the same code to run their algorithms at a much larger scale in a 
cluster setting.
+
+We are inspired by other open source efforts to provide ML systems, in 
particular
+[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML 
pipelines, and Spark’s
+[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that 
scale with problem and
+cluster sizes.
+
+We already have some of the building blocks for FlinkML in place, and will 
continue to extend the
+library with more algorithms. An example of how simple it is to create a 
learning model in
+FlinkML is given below:
+
+{% highlight scala %}
+// LabelbedVector is a feature vector with a label (class or real value)
--- End diff --

typo: `LabeledVector`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30784205
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._
   * At the moment, the whole partition is used for SGD, making it 
effectively a batch gradient
   * descent. Once a sampling operator has been introduced, the algorithm 
can be optimized
   *
-  * @param runParameters The parameters to tune the algorithm. Currently 
these include:
-  *  [[Solver.LossFunction]] for the loss function to 
be used,
-  *  [[Solver.RegularizationType]] for the type of 
regularization,
-  *  [[Solver.RegularizationParameter]] for the 
regularization parameter,
+  *  The parameters to tune the algorithm are:
+  *  [[Solver.LossFunctionParameter]] for the loss 
function to be used,
+  *  [[Solver.RegularizationTypeParameter]] for the 
type of regularization,
+  *  [[Solver.RegularizationValueParameter]] for the 
regularization parameter,
   *  [[IterativeSolver.Iterations]] for the maximum 
number of iteration,
   *  [[IterativeSolver.Stepsize]] for the learning 
rate used.
+  *  [[IterativeSolver.ConvergenceThreshold]] when 
provided the algorithm will
+  *  stop the iterations if the change in the value of 
the objective
+  *  function between successive iterations is is 
smaller than this value.
   */
-class GradientDescent(runParameters: ParameterMap) extends IterativeSolver 
{
+class GradientDescent() extends IterativeSolver() {
--- End diff --

Nope, I'll remove these here and in the other occurrence.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553918#comment-14553918
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30784142
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._
   * At the moment, the whole partition is used for SGD, making it 
effectively a batch gradient
   * descent. Once a sampling operator has been introduced, the algorithm 
can be optimized
   *
-  * @param runParameters The parameters to tune the algorithm. Currently 
these include:
-  *  [[Solver.LossFunction]] for the loss function to 
be used,
-  *  [[Solver.RegularizationType]] for the type of 
regularization,
-  *  [[Solver.RegularizationParameter]] for the 
regularization parameter,
+  *  The parameters to tune the algorithm are:
+  *  [[Solver.LossFunctionParameter]] for the loss 
function to be used,
+  *  [[Solver.RegularizationTypeParameter]] for the 
type of regularization,
+  *  [[Solver.RegularizationValueParameter]] for the 
regularization parameter,
--- End diff --

I'm doing this to avoid confusion between the LossFunction class and the 
parameter setting here. Having the same name I think creates some confusion, 
especially when auto-completing stuff in Intellij. However the names are 
starting to get pretty long.

You think we should remove this?


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553919#comment-14553919
 ] 

ASF GitHub Bot commented on FLINK-2034:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30784150
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
+
+Contrary to other data-processing systems, Flink exploits in-memory data 
streaming, and natively
+executes iterative processing algorithms which are common in ML. We plan 
to exploit the streaming
+nature of Flink, and provide functionality designed specifically for data 
streams.
+
+FlinkML will allow data scientists to test their models locally and using 
subsets of data, and then
+use the same code to run their algorithms at a much larger scale in a 
cluster setting.
+
+We are inspired by other open source efforts to provide ML systems, in 
particular
+[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML 
pipelines, and Spark’s
+[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that 
scale with problem and
+cluster sizes.
+
+We already have some of the building blocks for FlinkML in place, and will 
continue to extend the
+library with more algorithms. An example of how simple it is to create a 
learning model in
+FlinkML is given below:
+
+{% highlight scala %}
+// LabelbedVector is a feature vector with a label (class or real value)
--- End diff --

typo: `LabeledVector`


 Add vision and roadmap for ML library to docs
 -

 Key: FLINK-2034
 URL: https://issues.apache.org/jira/browse/FLINK-2034
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Theodore Vasiloudis
Assignee: Theodore Vasiloudis
  Labels: ML
 Fix For: 0.9


 We should have a document describing the vision of the Machine Learning 
 library in Flink and an up to date roadmap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553924#comment-14553924
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30784205
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._
   * At the moment, the whole partition is used for SGD, making it 
effectively a batch gradient
   * descent. Once a sampling operator has been introduced, the algorithm 
can be optimized
   *
-  * @param runParameters The parameters to tune the algorithm. Currently 
these include:
-  *  [[Solver.LossFunction]] for the loss function to 
be used,
-  *  [[Solver.RegularizationType]] for the type of 
regularization,
-  *  [[Solver.RegularizationParameter]] for the 
regularization parameter,
+  *  The parameters to tune the algorithm are:
+  *  [[Solver.LossFunctionParameter]] for the loss 
function to be used,
+  *  [[Solver.RegularizationTypeParameter]] for the 
type of regularization,
+  *  [[Solver.RegularizationValueParameter]] for the 
regularization parameter,
   *  [[IterativeSolver.Iterations]] for the maximum 
number of iteration,
   *  [[IterativeSolver.Stepsize]] for the learning 
rate used.
+  *  [[IterativeSolver.ConvergenceThreshold]] when 
provided the algorithm will
+  *  stop the iterations if the change in the value of 
the objective
+  *  function between successive iterations is is 
smaller than this value.
   */
-class GradientDescent(runParameters: ParameterMap) extends IterativeSolver 
{
+class GradientDescent() extends IterativeSolver() {
--- End diff --

Nope, I'll remove these here and in the other occurrence.


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553921#comment-14553921
 ] 

ASF GitHub Bot commented on FLINK-2034:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30784160
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
+
+Contrary to other data-processing systems, Flink exploits in-memory data 
streaming, and natively
+executes iterative processing algorithms which are common in ML. We plan 
to exploit the streaming
+nature of Flink, and provide functionality designed specifically for data 
streams.
+
+FlinkML will allow data scientists to test their models locally and using 
subsets of data, and then
+use the same code to run their algorithms at a much larger scale in a 
cluster setting.
+
+We are inspired by other open source efforts to provide ML systems, in 
particular
+[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML 
pipelines, and Spark’s
+[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that 
scale with problem and
+cluster sizes.
+
+We already have some of the building blocks for FlinkML in place, and will 
continue to extend the
+library with more algorithms. An example of how simple it is to create a 
learning model in
+FlinkML is given below:
+
+{% highlight scala %}
+// LabelbedVector is a feature vector with a label (class or real value)
+val data: DataSet[LabelVector] = ...
--- End diff --

`LabeledVector`


 Add vision and roadmap for ML library to docs
 -

 Key: FLINK-2034
 URL: https://issues.apache.org/jira/browse/FLINK-2034
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Theodore Vasiloudis
Assignee: Theodore Vasiloudis
  Labels: ML
 Fix For: 0.9


 We should have a document describing the vision of the Machine Learning 
 library in Flink and an up to date roadmap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30784183
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
+
+Contrary to other data-processing systems, Flink exploits in-memory data 
streaming, and natively
+executes iterative processing algorithms which are common in ML. We plan 
to exploit the streaming
+nature of Flink, and provide functionality designed specifically for data 
streams.
+
+FlinkML will allow data scientists to test their models locally and using 
subsets of data, and then
+use the same code to run their algorithms at a much larger scale in a 
cluster setting.
+
+We are inspired by other open source efforts to provide ML systems, in 
particular
+[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML 
pipelines, and Spark’s
+[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that 
scale with problem and
+cluster sizes.
+
+We already have some of the building blocks for FlinkML in place, and will 
continue to extend the
+library with more algorithms. An example of how simple it is to create a 
learning model in
+FlinkML is given below:
+
+{% highlight scala %}
+// LabelbedVector is a feature vector with a label (class or real value)
+val data: DataSet[LabelVector] = ...
+
+val learner = MultipleLinearRegression()
+
+val parameters = ParameterMap()
+  .add(MultipleLinearRegression.Stepsize, 1.0)
+  .add(MultipleLinearRegression.Iterations, 10)
+  .add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
+
+val model = learner.fit(data, parameters)
--- End diff --

With the new pipelining, this has to be updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553923#comment-14553923
 ] 

ASF GitHub Bot commented on FLINK-2034:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30784183
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
+
+Contrary to other data-processing systems, Flink exploits in-memory data 
streaming, and natively
+executes iterative processing algorithms which are common in ML. We plan 
to exploit the streaming
+nature of Flink, and provide functionality designed specifically for data 
streams.
+
+FlinkML will allow data scientists to test their models locally and using 
subsets of data, and then
+use the same code to run their algorithms at a much larger scale in a 
cluster setting.
+
+We are inspired by other open source efforts to provide ML systems, in 
particular
+[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML 
pipelines, and Spark’s
+[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that 
scale with problem and
+cluster sizes.
+
+We already have some of the building blocks for FlinkML in place, and will 
continue to extend the
+library with more algorithms. An example of how simple it is to create a 
learning model in
+FlinkML is given below:
+
+{% highlight scala %}
+// LabelbedVector is a feature vector with a label (class or real value)
+val data: DataSet[LabelVector] = ...
+
+val learner = MultipleLinearRegression()
+
+val parameters = ParameterMap()
+  .add(MultipleLinearRegression.Stepsize, 1.0)
+  .add(MultipleLinearRegression.Iterations, 10)
+  .add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
+
+val model = learner.fit(data, parameters)
--- End diff --

With the new pipelining, this has to be updated.


 Add vision and roadmap for ML library to docs
 -

 Key: FLINK-2034
 URL: https://issues.apache.org/jira/browse/FLINK-2034
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Theodore Vasiloudis
Assignee: Theodore Vasiloudis
  Labels: ML
 Fix For: 0.9


 We should have a document describing the vision of the Machine Learning 
 library in Flink and an up to date roadmap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553929#comment-14553929
 ] 

ASF GitHub Bot commented on FLINK-2034:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30784369
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
+
+Contrary to other data-processing systems, Flink exploits in-memory data 
streaming, and natively
+executes iterative processing algorithms which are common in ML. We plan 
to exploit the streaming
+nature of Flink, and provide functionality designed specifically for data 
streams.
+
+FlinkML will allow data scientists to test their models locally and using 
subsets of data, and then
+use the same code to run their algorithms at a much larger scale in a 
cluster setting.
+
+We are inspired by other open source efforts to provide ML systems, in 
particular
+[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML 
pipelines, and Spark’s
+[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that 
scale with problem and
+cluster sizes.
--- End diff --

Do you think, we need this paragraph here?


 Add vision and roadmap for ML library to docs
 -

 Key: FLINK-2034
 URL: https://issues.apache.org/jira/browse/FLINK-2034
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Theodore Vasiloudis
Assignee: Theodore Vasiloudis
  Labels: ML
 Fix For: 0.9


 We should have a document describing the vision of the Machine Learning 
 library in Flink and an up to date roadmap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553939#comment-14553939
 ] 

ASF GitHub Bot commented on FLINK-2034:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30784588
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
--- End diff --

A good documentation should be self-evident IMHO. Guess we can merge this 
paragraph with the previous, can't we?


 Add vision and roadmap for ML library to docs
 -

 Key: FLINK-2034
 URL: https://issues.apache.org/jira/browse/FLINK-2034
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Theodore Vasiloudis
Assignee: Theodore Vasiloudis
  Labels: ML
 Fix For: 0.9


 We should have a document describing the vision of the Machine Learning 
 library in Flink and an up to date roadmap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30784588
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
--- End diff --

A good documentation should be self-evident IMHO. Guess we can merge this 
paragraph with the previous, can't we?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30784817
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
+def lossCalculation(data: DataSet[LabeledVector], weightDS: 
DataSet[WeightVector]):
+DataSet[Double] = {
+  data.map {
+new LossCalculation
+  }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
+.reduce {
+(left, right) =
+  val (leftLoss, leftCount) = left
+  val (rightLoss, rightCount) = right
+  (leftLoss + rightLoss, rightCount + leftCount)
+  }
+.map{new RegularizedLossCalculation}
+.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
 }
-  }
-  case None = createInitialWeightVector(dimensionsDS)
-}
-
-// Perform the iterations
-// TODO: Enable convergence stopping criterion, as in Multiple Linear 
regression
-initialWeightsDS.iterate(numberOfIterations) {
-  weightVector = {
-SGDStep(data, weightVector)
-  }
+// We have to calculate for each weight vector the sum of squared 
residuals,
+// and then sum them and apply regularization
+val initialLossSumDS = lossCalculation(data, initialWeightsDS)
+
+// Combine weight vector with the current loss
+val initialWeightsWithLossSum = initialWeightsDS.
+  crossWithTiny(initialLossSumDS).setParallelism(1)
+
+val resultWithLoss = initialWeightsWithLossSum.
+  iterateWithTermination(numberOfIterations) {
+  weightsWithLossSum =
+
+// Extract weight vector and loss
+val previousWeightsDS = weightsWithLossSum.map{_._1}
+val previousLossSumDS = weightsWithLossSum.map{_._2}
+
+val currentWeightsDS = SGDStep(data, previousWeightsDS)
+
+val currentLossSumDS = lossCalculation(data, currentWeightsDS)
+
+// Check if the relative change in the loss is smaller than the
+// convergence threshold. If yes, then terminate i.e. return 
empty termination data set
+val termination = 
previousLossSumDS.crossWithTiny(currentLossSumDS).setParallelism(1).
+  filter{
+  pair = {
+val (previousLoss, currentLoss) = pair
+
+if (previousLoss = 0) {
+  false
+} else 

[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30784829
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
+
+Contrary to other data-processing systems, Flink exploits in-memory data 
streaming, and natively
+executes iterative processing algorithms which are common in ML. We plan 
to exploit the streaming
+nature of Flink, and provide functionality designed specifically for data 
streams.
+
+FlinkML will allow data scientists to test their models locally and using 
subsets of data, and then
+use the same code to run their algorithms at a much larger scale in a 
cluster setting.
+
+We are inspired by other open source efforts to provide ML systems, in 
particular
+[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML 
pipelines, and Spark’s
+[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that 
scale with problem and
+cluster sizes.
+
+We already have some of the building blocks for FlinkML in place, and will 
continue to extend the
+library with more algorithms. An example of how simple it is to create a 
learning model in
+FlinkML is given below:
--- End diff --

Good idea to show how to use FlinkML. I would extend the example a little 
bit and put it into a separate section. Maybe making a small tutorial out of it 
including the things you have to add to the your pom etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553946#comment-14553946
 ] 

ASF GitHub Bot commented on FLINK-2034:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30784829
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
+
+Contrary to other data-processing systems, Flink exploits in-memory data 
streaming, and natively
+executes iterative processing algorithms which are common in ML. We plan 
to exploit the streaming
+nature of Flink, and provide functionality designed specifically for data 
streams.
+
+FlinkML will allow data scientists to test their models locally and using 
subsets of data, and then
+use the same code to run their algorithms at a much larger scale in a 
cluster setting.
+
+We are inspired by other open source efforts to provide ML systems, in 
particular
+[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML 
pipelines, and Spark’s
+[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that 
scale with problem and
+cluster sizes.
+
+We already have some of the building blocks for FlinkML in place, and will 
continue to extend the
+library with more algorithms. An example of how simple it is to create a 
learning model in
+FlinkML is given below:
--- End diff --

Good idea to show how to use FlinkML. I would extend the example a little 
bit and put it into a separate section. Maybe making a small tutorial out of it 
including the things you have to add to the your pom etc.


 Add vision and roadmap for ML library to docs
 -

 Key: FLINK-2034
 URL: https://issues.apache.org/jira/browse/FLINK-2034
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Theodore Vasiloudis
Assignee: Theodore Vasiloudis
  Labels: ML
 Fix For: 0.9


 We should have a document describing the vision of the Machine Learning 
 library in Flink and an up to date roadmap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30784959
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
+
+Contrary to other data-processing systems, Flink exploits in-memory data 
streaming, and natively
+executes iterative processing algorithms which are common in ML. We plan 
to exploit the streaming
+nature of Flink, and provide functionality designed specifically for data 
streams.
+
+FlinkML will allow data scientists to test their models locally and using 
subsets of data, and then
+use the same code to run their algorithms at a much larger scale in a 
cluster setting.
+
+We are inspired by other open source efforts to provide ML systems, in 
particular
+[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML 
pipelines, and Spark’s
+[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that 
scale with problem and
+cluster sizes.
+
+We already have some of the building blocks for FlinkML in place, and will 
continue to extend the
+library with more algorithms. An example of how simple it is to create a 
learning model in
+FlinkML is given below:
+
+{% highlight scala %}
+// LabelbedVector is a feature vector with a label (class or real value)
+val data: DataSet[LabelVector] = ...
+
+val learner = MultipleLinearRegression()
+
+val parameters = ParameterMap()
+  .add(MultipleLinearRegression.Stepsize, 1.0)
+  .add(MultipleLinearRegression.Iterations, 10)
+  .add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
+
+val model = learner.fit(data, parameters)
+{% endhighlight %}
+
+The roadmap below can provide an indication of the algorithms we aim to 
implement in the coming
+months. Items in **bold** have already been implemented:
+
+
+* Pipelines of transformers and learners
+* Data pre-processing
+  * **Feature scaling**
+  * **Polynomial feature base mapper**
+  * Feature hashing
+  * Feature extraction for text
+  * Dimensionality reduction
+* Model selection and performance evaluation
+  * Cross-validation for model selection and evaluation
+* Supervised learning
+  * Optimization framework
+* **Stochastic Gradient Descent**
+* L-BFGS
+  * Generalized Linear Models
+* **Multiple linear regression**
+* LASSO, Ridge regression
+* Multi-class Logistic regression
+  * Random forests
+  * **Support Vector Machines**
+* Unsupervised learning
+  * Clustering
+* K-means clustering
+  * PCA
+* Recommendation
+  * **ALS**
+* Text analytics
+  * LDA
+* Statistical estimation tools
+* Distributed linear algebra
+* Streaming ML
--- End diff --

I like the idea of a roadmap so that people see where we're steering. But I 
would probably put it on a separate page where we can also elaborate a little 
bit what distributed linear algebra means and how it integrates.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553949#comment-14553949
 ] 

ASF GitHub Bot commented on FLINK-2034:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30784959
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
+
+Contrary to other data-processing systems, Flink exploits in-memory data 
streaming, and natively
+executes iterative processing algorithms which are common in ML. We plan 
to exploit the streaming
+nature of Flink, and provide functionality designed specifically for data 
streams.
+
+FlinkML will allow data scientists to test their models locally and using 
subsets of data, and then
+use the same code to run their algorithms at a much larger scale in a 
cluster setting.
+
+We are inspired by other open source efforts to provide ML systems, in 
particular
+[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML 
pipelines, and Spark’s
+[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that 
scale with problem and
+cluster sizes.
+
+We already have some of the building blocks for FlinkML in place, and will 
continue to extend the
+library with more algorithms. An example of how simple it is to create a 
learning model in
+FlinkML is given below:
+
+{% highlight scala %}
+// LabelbedVector is a feature vector with a label (class or real value)
+val data: DataSet[LabelVector] = ...
+
+val learner = MultipleLinearRegression()
+
+val parameters = ParameterMap()
+  .add(MultipleLinearRegression.Stepsize, 1.0)
+  .add(MultipleLinearRegression.Iterations, 10)
+  .add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
+
+val model = learner.fit(data, parameters)
+{% endhighlight %}
+
+The roadmap below can provide an indication of the algorithms we aim to 
implement in the coming
+months. Items in **bold** have already been implemented:
+
+
+* Pipelines of transformers and learners
+* Data pre-processing
+  * **Feature scaling**
+  * **Polynomial feature base mapper**
+  * Feature hashing
+  * Feature extraction for text
+  * Dimensionality reduction
+* Model selection and performance evaluation
+  * Cross-validation for model selection and evaluation
+* Supervised learning
+  * Optimization framework
+* **Stochastic Gradient Descent**
+* L-BFGS
+  * Generalized Linear Models
+* **Multiple linear regression**
+* LASSO, Ridge regression
+* Multi-class Logistic regression
+  * Random forests
+  * **Support Vector Machines**
+* Unsupervised learning
+  * Clustering
+* K-means clustering
+  * PCA
+* Recommendation
+  * **ALS**
+* Text analytics
+  * LDA
+* Statistical estimation tools
+* Distributed linear algebra
+* Streaming ML
--- End diff --

I like the idea of a roadmap so that people see where we're steering. But I 
would probably put it on a separate page where we can also elaborate a little 
bit what distributed linear algebra means and how it integrates.


 Add vision and roadmap for ML library to docs
 -

 Key: FLINK-2034
 URL: https://issues.apache.org/jira/browse/FLINK-2034
 Project: Flink
  

[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30785023
  
--- Diff: docs/libs/ml/index.md ---
@@ -30,7 +122,7 @@ under the License.
 /dependency
 {% endhighlight %}
 
-## Algorithms
+## Algorithm Documentation
--- End diff --

I think, we should move the algorithms a little bit more prominently to the 
top of the page.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30785537
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala
 ---
@@ -95,6 +95,7 @@ abstract class DiffRegularization extends Regularization {
   regParameter: Double)
 }
 
+// TODO(tvas): I think NoRegularization should extend DiffRegularization
--- End diff --

Because the idea is to disallow setting non-differentiable regularization 
in solver like L-BFGS which require differentiable loss, through type-matching.

By setting NoRegularization to extend DiffRegularization we would be able 
to use it with L-BFGS


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553957#comment-14553957
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30785537
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala
 ---
@@ -95,6 +95,7 @@ abstract class DiffRegularization extends Regularization {
   regParameter: Double)
 }
 
+// TODO(tvas): I think NoRegularization should extend DiffRegularization
--- End diff --

Because the idea is to disallow setting non-differentiable regularization 
in solver like L-BFGS which require differentiable loss, through type-matching.

By setting NoRegularization to extend DiffRegularization we would be able 
to use it with L-BFGS


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553974#comment-14553974
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30786533
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala
 ---
@@ -95,6 +95,7 @@ abstract class DiffRegularization extends Regularization {
   regParameter: Double)
 }
 
+// TODO(tvas): I think NoRegularization should extend DiffRegularization
--- End diff --

Sounds reasonable.


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30786533
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/Regularization.scala
 ---
@@ -95,6 +95,7 @@ abstract class DiffRegularization extends Regularization {
   regParameter: Double)
 }
 
+// TODO(tvas): I think NoRegularization should extend DiffRegularization
--- End diff --

Sounds reasonable.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553971#comment-14553971
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30786437
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
--- End diff --

That is ScalaDoc what you've written. Usually this is only used for 
classes/objects/packages/methods but not method body comments.


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...

2015-05-21 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30786490
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
--- End diff --

I think the reason this is here is to communicate to people that would also 
like to contribute to the library that we consider documentation an integral 
part of the library and not an after-thought.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553973#comment-14553973
 ] 

ASF GitHub Bot commented on FLINK-2034:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30786490
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
--- End diff --

I think the reason this is here is to communicate to people that would also 
like to contribute to the library that we consider documentation an integral 
part of the library and not an after-thought.


 Add vision and roadmap for ML library to docs
 -

 Key: FLINK-2034
 URL: https://issues.apache.org/jira/browse/FLINK-2034
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Theodore Vasiloudis
Assignee: Theodore Vasiloudis
  Labels: ML
 Fix For: 0.9


 We should have a document describing the vision of the Machine Learning 
 library in Flink and an up to date roadmap.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...

2015-05-21 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30786639
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
+
+Contrary to other data-processing systems, Flink exploits in-memory data 
streaming, and natively
+executes iterative processing algorithms which are common in ML. We plan 
to exploit the streaming
+nature of Flink, and provide functionality designed specifically for data 
streams.
+
+FlinkML will allow data scientists to test their models locally and using 
subsets of data, and then
+use the same code to run their algorithms at a much larger scale in a 
cluster setting.
+
+We are inspired by other open source efforts to provide ML systems, in 
particular
+[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML 
pipelines, and Spark’s
+[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that 
scale with problem and
+cluster sizes.
+
+We already have some of the building blocks for FlinkML in place, and will 
continue to extend the
+library with more algorithms. An example of how simple it is to create a 
learning model in
+FlinkML is given below:
+
+{% highlight scala %}
+// LabelbedVector is a feature vector with a label (class or real value)
+val data: DataSet[LabelVector] = ...
+
+val learner = MultipleLinearRegression()
+
+val parameters = ParameterMap()
+  .add(MultipleLinearRegression.Stepsize, 1.0)
+  .add(MultipleLinearRegression.Iterations, 10)
+  .add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
+
+val model = learner.fit(data, parameters)
+{% endhighlight %}
+
+The roadmap below can provide an indication of the algorithms we aim to 
implement in the coming
+months. Items in **bold** have already been implemented:
+
+
+* Pipelines of transformers and learners
+* Data pre-processing
+  * **Feature scaling**
+  * **Polynomial feature base mapper**
+  * Feature hashing
+  * Feature extraction for text
+  * Dimensionality reduction
+* Model selection and performance evaluation
+  * Cross-validation for model selection and evaluation
+* Supervised learning
+  * Optimization framework
+* **Stochastic Gradient Descent**
+* L-BFGS
+  * Generalized Linear Models
+* **Multiple linear regression**
+* LASSO, Ridge regression
+* Multi-class Logistic regression
+  * Random forests
+  * **Support Vector Machines**
+* Unsupervised learning
+  * Clustering
+* K-means clustering
+  * PCA
+* Recommendation
+  * **ALS**
+* Text analytics
+  * LDA
+* Statistical estimation tools
+* Distributed linear algebra
+* Streaming ML
--- End diff --

That's a good idea, I'll add a link to the roadmap instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2034) Add vision and roadmap for ML library to docs

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553977#comment-14553977
 ] 

ASF GitHub Bot commented on FLINK-2034:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30786639
  
--- Diff: docs/libs/ml/index.md ---
@@ -20,8 +20,100 @@ specific language governing permissions and limitations
 under the License.
 --
 
+The Machine Learning (ML) library for Flink is a new effort to bring 
scalable ML tools to the Flink
+community. Our goal is is to design and implement a system that is 
scalable and can deal with
+problems of various sizes, whether your data size is measured in megabytes 
or terabytes and beyond.
+We call this library FlinkML.
+
+An important concern for developers of ML systems is the amount of glue 
code that developers are
+forced to write [1] in the process of implementing an end-to-end ML 
system. Our goal with FlinkML
+is to help developers keep glue code to a minimum. The Flink ecosystem 
provides a great setting to
+tackle this problem, with its scalable ETL capabilities that can be easily 
combined inside the same
+program with FlinkML, allowing the development of robust pipelines without 
the need to use yet
+another technology for data ingestion and data munging.
+
+Another goal for FlinkML is to make the library easy to use. To that end 
we will be providing
+detailed documentation along with examples for every part of the system. 
Our aim is that developers
+will be able to get started with writing their ML pipelines quickly, using 
familiar programming
+concepts and terminology.
+
+Contrary to other data-processing systems, Flink exploits in-memory data 
streaming, and natively
+executes iterative processing algorithms which are common in ML. We plan 
to exploit the streaming
+nature of Flink, and provide functionality designed specifically for data 
streams.
+
+FlinkML will allow data scientists to test their models locally and using 
subsets of data, and then
+use the same code to run their algorithms at a much larger scale in a 
cluster setting.
+
+We are inspired by other open source efforts to provide ML systems, in 
particular
+[scikit-learn](http://scikit-learn.org/) for cleanly specifying ML 
pipelines, and Spark’s
+[MLLib](https://spark.apache.org/mllib/) for providing ML algorithms that 
scale with problem and
+cluster sizes.
+
+We already have some of the building blocks for FlinkML in place, and will 
continue to extend the
+library with more algorithms. An example of how simple it is to create a 
learning model in
+FlinkML is given below:
+
+{% highlight scala %}
+// LabelbedVector is a feature vector with a label (class or real value)
+val data: DataSet[LabelVector] = ...
+
+val learner = MultipleLinearRegression()
+
+val parameters = ParameterMap()
+  .add(MultipleLinearRegression.Stepsize, 1.0)
+  .add(MultipleLinearRegression.Iterations, 10)
+  .add(MultipleLinearRegression.ConvergenceThreshold, 0.001)
+
+val model = learner.fit(data, parameters)
+{% endhighlight %}
+
+The roadmap below can provide an indication of the algorithms we aim to 
implement in the coming
+months. Items in **bold** have already been implemented:
+
+
+* Pipelines of transformers and learners
+* Data pre-processing
+  * **Feature scaling**
+  * **Polynomial feature base mapper**
+  * Feature hashing
+  * Feature extraction for text
+  * Dimensionality reduction
+* Model selection and performance evaluation
+  * Cross-validation for model selection and evaluation
+* Supervised learning
+  * Optimization framework
+* **Stochastic Gradient Descent**
+* L-BFGS
+  * Generalized Linear Models
+* **Multiple linear regression**
+* LASSO, Ridge regression
+* Multi-class Logistic regression
+  * Random forests
+  * **Support Vector Machines**
+* Unsupervised learning
+  * Clustering
+* K-means clustering
+  * PCA
+* Recommendation
+  * **ALS**
+* Text analytics
+  * LDA
+* Statistical estimation tools
+* Distributed linear algebra
+* Streaming ML
--- End diff --

That's a good idea, I'll add a link to the roadmap instead.


 Add vision and roadmap for ML library to docs
 -

 Key: FLINK-2034
 URL: https://issues.apache.org/jira/browse/FLINK-2034
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Theodore Vasiloudis
Assignee: Theodore Vasiloudis
  

[GitHub] flink pull request: [FLINK-2034] [docs] Add vision and roadmap for...

2015-05-21 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/688#discussion_r30786723
  
--- Diff: docs/libs/ml/index.md ---
@@ -30,7 +122,7 @@ under the License.
 /dependency
 {% endhighlight %}
 
-## Algorithms
+## Algorithm Documentation
--- End diff --

Yeah this becomes a bit problematic now. This text is not really what you 
expect from the index of a library, so maybe it should be moved to its own 
section (Vision and roadmap?) with only a very brief introduction remaining on 
the index.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553983#comment-14553983
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30786949
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._
   * At the moment, the whole partition is used for SGD, making it 
effectively a batch gradient
   * descent. Once a sampling operator has been introduced, the algorithm 
can be optimized
   *
-  * @param runParameters The parameters to tune the algorithm. Currently 
these include:
-  *  [[Solver.LossFunction]] for the loss function to 
be used,
-  *  [[Solver.RegularizationType]] for the type of 
regularization,
-  *  [[Solver.RegularizationParameter]] for the 
regularization parameter,
+  *  The parameters to tune the algorithm are:
+  *  [[Solver.LossFunctionParameter]] for the loss 
function to be used,
+  *  [[Solver.RegularizationTypeParameter]] for the 
type of regularization,
+  *  [[Solver.RegularizationValueParameter]] for the 
regularization parameter,
--- End diff --

Does IntelliJ suggests the wrong `LossFunction` type when you use it to set 
the parameter value in the `ParameterMap`?

I see the point with the same type names from a programmer's perspective. 
As a user, though, it makes sense that the parameter is called `LossFunction`, 
for example. I think I'm slightly in favour of the shorter version.


 Add convergence criterion to SGD optimizer
 --

 Key: FLINK-1992
 URL: https://issues.apache.org/jira/browse/FLINK-1992
 Project: Flink
  Issue Type: Improvement
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Theodore Vasiloudis
Priority: Minor
  Labels: ML
 Fix For: 0.9


 Currently, Flink's SGD optimizer runs for a fixed number of iterations. It 
 would be good to support a dynamic convergence criterion, too.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Some fixes for Scala type analysis

2015-05-21 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/669#issuecomment-104204248
  
It would be good to share a bit of information beyond `Some fixes for Scala 
type analysis` about what is actually changed. Then others could think about 
possible implications and give comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1992] [ml] Add convergence criterion to...

2015-05-21 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30786949
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -36,19 +36,20 @@ import org.apache.flink.ml.optimization.Solver._
   * At the moment, the whole partition is used for SGD, making it 
effectively a batch gradient
   * descent. Once a sampling operator has been introduced, the algorithm 
can be optimized
   *
-  * @param runParameters The parameters to tune the algorithm. Currently 
these include:
-  *  [[Solver.LossFunction]] for the loss function to 
be used,
-  *  [[Solver.RegularizationType]] for the type of 
regularization,
-  *  [[Solver.RegularizationParameter]] for the 
regularization parameter,
+  *  The parameters to tune the algorithm are:
+  *  [[Solver.LossFunctionParameter]] for the loss 
function to be used,
+  *  [[Solver.RegularizationTypeParameter]] for the 
type of regularization,
+  *  [[Solver.RegularizationValueParameter]] for the 
regularization parameter,
--- End diff --

Does IntelliJ suggests the wrong `LossFunction` type when you use it to set 
the parameter value in the `ParameterMap`?

I see the point with the same type names from a programmer's perspective. 
As a user, though, it makes sense that the parameter is called `LossFunction`, 
for example. I think I'm slightly in favour of the shorter version.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1992) Add convergence criterion to SGD optimizer

2015-05-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553989#comment-14553989
 ] 

ASF GitHub Bot commented on FLINK-1992:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/692#discussion_r30787129
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/optimization/GradientDescent.scala
 ---
@@ -76,86 +77,163 @@ class GradientDescent(runParameters: ParameterMap) 
extends IterativeSolver {
 }.withBroadcastSet(currentWeights, WEIGHTVECTOR_BROADCAST)
   }
 
+
+
   /** Provides a solution for the given optimization problem
 *
 * @param data A Dataset of LabeledVector (label, features) pairs
-* @param initWeights The initial weights that will be optimized
+* @param initialWeights The initial weights that will be optimized
 * @return The weights, optimized for the provided data.
 */
   override def optimize(
 data: DataSet[LabeledVector],
-initWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] = {
-// TODO: Faster way to do this?
-val dimensionsDS = data.map(_.vector.size).reduce((a, b) = b)
-
-val numberOfIterations: Int = parameterMap(Iterations)
+initialWeights: Option[DataSet[WeightVector]]): DataSet[WeightVector] 
= {
+val numberOfIterations: Int = parameters(Iterations)
+// TODO(tvas): This looks out of place, why don't we get back an 
Option from
+// parameters(ConvergenceThreshold)?
+val convergenceThresholdOption = parameters.get(ConvergenceThreshold)
 
 // Initialize weights
-val initialWeightsDS: DataSet[WeightVector] = initWeights match {
-  // Ensure provided weight vector is a DenseVector
-  case Some(wvDS) = {
-wvDS.map{wv = {
-  val denseWeights = wv.weights match {
-case dv: DenseVector = dv
-case sv: SparseVector = sv.toDenseVector
+val initialWeightsDS: DataSet[WeightVector] = 
createInitialWeightsDS(initialWeights, data)
+
+// Perform the iterations
+val optimizedWeights = convergenceThresholdOption match {
+  // No convergence criterion
+  case None =
+initialWeightsDS.iterate(numberOfIterations) {
+  weightVectorDS = {
+SGDStep(data, weightVectorDS)
   }
-  WeightVector(denseWeights, wv.intercept)
 }
-
+  case Some(convergence) =
+/** Calculates the regularized loss, from the data and given 
weights **/
+def lossCalculation(data: DataSet[LabeledVector], weightDS: 
DataSet[WeightVector]):
+DataSet[Double] = {
+  data.map {
+new LossCalculation
+  }.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
+.reduce {
+(left, right) =
+  val (leftLoss, leftCount) = left
+  val (rightLoss, rightCount) = right
+  (leftLoss + rightLoss, rightCount + leftCount)
+  }
+.map{new RegularizedLossCalculation}
+.withBroadcastSet(weightDS, WEIGHTVECTOR_BROADCAST)
 }
-  }
-  case None = createInitialWeightVector(dimensionsDS)
-}
-
-// Perform the iterations
-// TODO: Enable convergence stopping criterion, as in Multiple Linear 
regression
-initialWeightsDS.iterate(numberOfIterations) {
-  weightVector = {
-SGDStep(data, weightVector)
-  }
+// We have to calculate for each weight vector the sum of squared 
residuals,
+// and then sum them and apply regularization
+val initialLossSumDS = lossCalculation(data, initialWeightsDS)
+
+// Combine weight vector with the current loss
+val initialWeightsWithLossSum = initialWeightsDS.
+  crossWithTiny(initialLossSumDS).setParallelism(1)
+
+val resultWithLoss = initialWeightsWithLossSum.
+  iterateWithTermination(numberOfIterations) {
+  weightsWithLossSum =
+
+// Extract weight vector and loss
+val previousWeightsDS = weightsWithLossSum.map{_._1}
+val previousLossSumDS = weightsWithLossSum.map{_._2}
+
+val currentWeightsDS = SGDStep(data, previousWeightsDS)
+
+val currentLossSumDS = lossCalculation(data, currentWeightsDS)
+
+// Check if the relative change in the loss is smaller than the
+// convergence threshold. If yes, then terminate i.e. return 
empty termination data set
+val termination = 

  1   2   3   >