[jira] [Commented] (FLINK-2283) Make grouped reduce/fold/aggregations stateful using Partitioned state

2015-06-29 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605416#comment-14605416
 ] 

Stephan Ewen commented on FLINK-2283:
-

This would be a temporary solution that will most likely be touched again 
fairly soon. It may make sense to do this now, if the implementation is easy 
and lightweight such that it does not hurt if the code becomes obsolete in a 
bit...

 Make grouped reduce/fold/aggregations stateful using Partitioned state
 --

 Key: FLINK-2283
 URL: https://issues.apache.org/jira/browse/FLINK-2283
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Gyula Fora
Priority: Minor

 Currently the inner state of the grouped aggregations are not persisted as an 
 operator state. 
 These operators should be reimplemented to use the newly introduced 
 partitioned state abstractions which will make them fault tolerant and 
 scalable for the future.
 A suggested implementation would be to use a stateful mapper to implement the 
 desired behaviour.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

2015-06-29 Thread gaborhermann
Github user gaborhermann commented on the pull request:

https://github.com/apache/flink/pull/872#issuecomment-116647912
  
By the way, in the Scala DataSet the user should specify the Java 
`Partitioner[K]` class. Wouldn't it be more convenient to wrap a function like 
`(K, Int) = Int` into a `Partitioner[K]` similarly to the KeySelector?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605571#comment-14605571
 ] 

ASF GitHub Bot commented on FLINK-1731:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33460336
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * 

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33462036
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * 

[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605612#comment-14605612
 ] 

ASF GitHub Bot commented on FLINK-1731:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33463484
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/clustering/Clustering.scala
 ---
@@ -0,0 +1,256 @@
+/*
--- End diff --

Rename to ClusteringData.scala


 Add kMeans clustering algorithm to machine learning library
 ---

 Key: FLINK-1731
 URL: https://issues.apache.org/jira/browse/FLINK-1731
 Project: Flink
  Issue Type: New Feature
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Peter Schrott
  Labels: ML

 The Flink repository already contains a kMeans implementation but it is not 
 yet ported to the machine learning library. I assume that only the used data 
 types have to be adapted and then it can be more or less directly moved to 
 flink-ml.
 The kMeans++ [1] and the kMeans|| [2] algorithm constitute a better 
 implementation because the improve the initial seeding phase to achieve near 
 optimal clustering. It might be worthwhile to implement kMeans||.
 Resources:
 [1] http://ilpubs.stanford.edu:8090/778/1/2006-13.pdf
 [2] http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33463484
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/clustering/Clustering.scala
 ---
@@ -0,0 +1,256 @@
+/*
--- End diff --

Rename to ClusteringData.scala


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2138) PartitionCustom for streaming

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605634#comment-14605634
 ] 

ASF GitHub Bot commented on FLINK-2138:
---

Github user gaborhermann commented on the pull request:

https://github.com/apache/flink/pull/872#issuecomment-116671285
  
I'd prefer the function implementation (like `(K, Int) = Int`), but it 
should stay consistent with the batch API. I don't see why the wrapping would 
effect the compatibility checking of the partitioning.

Is it okay, if I change it to the function implementation in both (Scala 
batch, Scala streaming) APIs? If not, then let's just stick with the 
partitioner implementation in the APIs.


 PartitionCustom for streaming
 -

 Key: FLINK-2138
 URL: https://issues.apache.org/jira/browse/FLINK-2138
 Project: Flink
  Issue Type: New Feature
  Components: Streaming
Affects Versions: 0.9
Reporter: Márton Balassi
Assignee: Gábor Hermann
Priority: Minor

 The batch API has support for custom partitioning, this should be added for 
 streaming with a similar signature.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2138) PartitionCustom for streaming

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605565#comment-14605565
 ] 

ASF GitHub Bot commented on FLINK-2138:
---

Github user gaborhermann commented on the pull request:

https://github.com/apache/flink/pull/872#issuecomment-116647912
  
By the way, in the Scala DataSet the user should specify the Java 
`Partitioner[K]` class. Wouldn't it be more convenient to wrap a function like 
`(K, Int) = Int` into a `Partitioner[K]` similarly to the KeySelector?


 PartitionCustom for streaming
 -

 Key: FLINK-2138
 URL: https://issues.apache.org/jira/browse/FLINK-2138
 Project: Flink
  Issue Type: New Feature
  Components: Streaming
Affects Versions: 0.9
Reporter: Márton Balassi
Assignee: Gábor Hermann
Priority: Minor

 The batch API has support for custom partitioning, this should be added for 
 streaming with a similar signature.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2138) PartitionCustom for streaming

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605570#comment-14605570
 ] 

ASF GitHub Bot commented on FLINK-2138:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/872#issuecomment-116649358
  
In the batch API, equality of the partitioners is used to determine 
compatibility of the partitioning. This may at some point become interesting 
for the streaming API as well.

In any case, let's pick one of the two variants (function of partitioner 
implementation). Overloading the methods too much with equally powerful 
variants inevitable confuses users.


 PartitionCustom for streaming
 -

 Key: FLINK-2138
 URL: https://issues.apache.org/jira/browse/FLINK-2138
 Project: Flink
  Issue Type: New Feature
  Components: Streaming
Affects Versions: 0.9
Reporter: Márton Balassi
Assignee: Gábor Hermann
Priority: Minor

 The batch API has support for custom partitioning, this should be added for 
 streaming with a similar signature.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

2015-06-29 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/872#issuecomment-116649358
  
In the batch API, equality of the partitioners is used to determine 
compatibility of the partitioning. This may at some point become interesting 
for the streaming API as well.

In any case, let's pick one of the two variants (function of partitioner 
implementation). Overloading the methods too much with equally powerful 
variants inevitable confuses users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605613#comment-14605613
 ] 

ASF GitHub Bot commented on FLINK-1731:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33463634
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/clustering/Clustering.scala
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import breeze.linalg.{DenseVector = BreezeDenseVector, Vector = 
BreezeVector}
+import org.apache.flink.ml.common.LabeledVector
+import org.apache.flink.ml.math.{DenseVector, Vector}
+
+/**
+ * Trainings- and test-data set for the K-Means implementation
+ * [[org.apache.flink.ml.clustering.KMeans]].
+ */
+object Clustering {
+
+  /*
+   * Number of iterations for the K-Means algorithm.
+   */
+  val iterations = 10
+
+  /*
+   * Sequence of initial centroids.
+   */
+  val centroidData: Seq[LabeledVector] = Seq(
+LabeledVector(1, DenseVector(-0.1369104662767052, 0.2949172396037093, 
-0.01070450818187003)),
+LabeledVector(2, DenseVector(0.43643950041582885, 0.30117329671833215, 
0.20965108353159922)),
+LabeledVector(3, DenseVector(0.26011627041438423, 0.22954649683337805, 
0.2936286262276151)),
+LabeledVector(4, DenseVector(-0.041980932305508145, 
0.03116256923634109, 0.31065743174542293)),
+LabeledVector(5, DenseVector(0.0984398491976613, -0.21227718242541602, 
-0.45083084300074255)),
+LabeledVector(6, DenseVector(-0.216526923545, 
-0.47142840804338293, -0.02298954070830948)),
+LabeledVector(7, DenseVector(-0.0632307695567563, 0.2387221400443612, 
0.09416850805771804)),
+LabeledVector(8, DenseVector(0.16383680898916775, 
-0.24586810465119346, 0.08783590589294081)),
+LabeledVector(9, DenseVector(-0.24763544645492513, 
0.19688995732231254, 0.4520904742796472)),
+LabeledVector(10, DenseVector(0.16468044138881932, 
0.06259522206982082, 0.12145870313604247))
+
+  )
+
+  /*
+   * 3 Dimensional DenseVectors from a Part of Cosmo-Gas Dataset
+   * Reference: http://nuage.cs.washington.edu/benchmark/
+   */
+  val trainingData: Seq[Vector] = Seq(
+DenseVector(-0.489811986685, 0.496883004904, -0.483860999346),
+DenseVector(-0.485296010971, 0.496421992779, -0.484212994576),
+DenseVector(-0.481514006853, 0.496134012938, -0.48508900404),
+DenseVector(-0.47854255, 0.496246010065, -0.486301004887),
+DenseVector(-0.475461006165, 0.496093004942, -0.487686008215),
+DenseVector(-0.471846997738, 0.496558994055, -0.488242000341),
+DenseVector(-0.467496991158, 0.497166007757, -0.48861899972),
+DenseVector(-0.463036000729, 0.497680991888, -0.489721000195),
+DenseVector(-0.458972990513, 0.4984369874, -0.490575999022),
+DenseVector(-0.455772012472, 0.499684005976, -0.491737008095),
+DenseVector(-0.453074991703, -0.499433010817, -0.492006987333),
+DenseVector(-0.450913995504, -0.499316990376, -0.492769002914),
+DenseVector(-0.448724985123, -0.499406009912, -0.493508011103),
+DenseVector(-0.44715899229, -0.499680995941, -0.494500011206),
+DenseVector(-0.445362001657, -0.499630987644, -0.495151996613),
+DenseVector(-0.442811012268, -0.499303996563, -0.495151013136),
+DenseVector(-0.439810991287, -0.499332994223, -0.49529799819),
+DenseVector(-0.43678098917, -0.499361991882, -0.49545699358),
+DenseVector(-0.433919012547, -0.499334007502, -0.495705991983),
+DenseVector(-0.43117800355, -0.499345004559, -0.496196985245),
+DenseVector(-0.428333997726, -0.499083012342, -0.496385991573),
+DenseVector(-0.425300985575, -0.49844199419, -0.496405988932),
+DenseVector(-0.421882003546, -0.497743010521, -0.496706992388),
+

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33463634
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/clustering/Clustering.scala
 ---
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import breeze.linalg.{DenseVector = BreezeDenseVector, Vector = 
BreezeVector}
+import org.apache.flink.ml.common.LabeledVector
+import org.apache.flink.ml.math.{DenseVector, Vector}
+
+/**
+ * Trainings- and test-data set for the K-Means implementation
+ * [[org.apache.flink.ml.clustering.KMeans]].
+ */
+object Clustering {
+
+  /*
+   * Number of iterations for the K-Means algorithm.
+   */
+  val iterations = 10
+
+  /*
+   * Sequence of initial centroids.
+   */
+  val centroidData: Seq[LabeledVector] = Seq(
+LabeledVector(1, DenseVector(-0.1369104662767052, 0.2949172396037093, 
-0.01070450818187003)),
+LabeledVector(2, DenseVector(0.43643950041582885, 0.30117329671833215, 
0.20965108353159922)),
+LabeledVector(3, DenseVector(0.26011627041438423, 0.22954649683337805, 
0.2936286262276151)),
+LabeledVector(4, DenseVector(-0.041980932305508145, 
0.03116256923634109, 0.31065743174542293)),
+LabeledVector(5, DenseVector(0.0984398491976613, -0.21227718242541602, 
-0.45083084300074255)),
+LabeledVector(6, DenseVector(-0.216526923545, 
-0.47142840804338293, -0.02298954070830948)),
+LabeledVector(7, DenseVector(-0.0632307695567563, 0.2387221400443612, 
0.09416850805771804)),
+LabeledVector(8, DenseVector(0.16383680898916775, 
-0.24586810465119346, 0.08783590589294081)),
+LabeledVector(9, DenseVector(-0.24763544645492513, 
0.19688995732231254, 0.4520904742796472)),
+LabeledVector(10, DenseVector(0.16468044138881932, 
0.06259522206982082, 0.12145870313604247))
+
+  )
+
+  /*
+   * 3 Dimensional DenseVectors from a Part of Cosmo-Gas Dataset
+   * Reference: http://nuage.cs.washington.edu/benchmark/
+   */
+  val trainingData: Seq[Vector] = Seq(
+DenseVector(-0.489811986685, 0.496883004904, -0.483860999346),
+DenseVector(-0.485296010971, 0.496421992779, -0.484212994576),
+DenseVector(-0.481514006853, 0.496134012938, -0.48508900404),
+DenseVector(-0.47854255, 0.496246010065, -0.486301004887),
+DenseVector(-0.475461006165, 0.496093004942, -0.487686008215),
+DenseVector(-0.471846997738, 0.496558994055, -0.488242000341),
+DenseVector(-0.467496991158, 0.497166007757, -0.48861899972),
+DenseVector(-0.463036000729, 0.497680991888, -0.489721000195),
+DenseVector(-0.458972990513, 0.4984369874, -0.490575999022),
+DenseVector(-0.455772012472, 0.499684005976, -0.491737008095),
+DenseVector(-0.453074991703, -0.499433010817, -0.492006987333),
+DenseVector(-0.450913995504, -0.499316990376, -0.492769002914),
+DenseVector(-0.448724985123, -0.499406009912, -0.493508011103),
+DenseVector(-0.44715899229, -0.499680995941, -0.494500011206),
+DenseVector(-0.445362001657, -0.499630987644, -0.495151996613),
+DenseVector(-0.442811012268, -0.499303996563, -0.495151013136),
+DenseVector(-0.439810991287, -0.499332994223, -0.49529799819),
+DenseVector(-0.43678098917, -0.499361991882, -0.49545699358),
+DenseVector(-0.433919012547, -0.499334007502, -0.495705991983),
+DenseVector(-0.43117800355, -0.499345004559, -0.496196985245),
+DenseVector(-0.428333997726, -0.499083012342, -0.496385991573),
+DenseVector(-0.425300985575, -0.49844199419, -0.496405988932),
+DenseVector(-0.421882003546, -0.497743010521, -0.496706992388),
+DenseVector(-0.418137013912, -0.497193992138, -0.496524989605),
+DenseVector(-0.414458990097, -0.496717989445, -0.49600699544),
+DenseVector(-0.411509007215, -0.495965003967, -0.495519012213),
+DenseVector(-0.40851598978, -0.49593898654, 

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-116669400
  
Another note: It should not be necessary for the user to provide the 
initial centroids, those should be possible to generated from the algorithm 
itself, ideally with a scheme like kmeans++.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33460336
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * 

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33461173
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * 

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33463192
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * 

[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605607#comment-14605607
 ] 

ASF GitHub Bot commented on FLINK-1731:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33463192
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * 

[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605627#comment-14605627
 ] 

ASF GitHub Bot commented on FLINK-1731:
---

Github user peedeeX21 commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33465223
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/clustering/Clustering.scala
 ---
@@ -0,0 +1,256 @@
+/*
--- End diff --

done


 Add kMeans clustering algorithm to machine learning library
 ---

 Key: FLINK-1731
 URL: https://issues.apache.org/jira/browse/FLINK-1731
 Project: Flink
  Issue Type: New Feature
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Peter Schrott
  Labels: ML

 The Flink repository already contains a kMeans implementation but it is not 
 yet ported to the machine learning library. I assume that only the used data 
 types have to be adapted and then it can be more or less directly moved to 
 flink-ml.
 The kMeans++ [1] and the kMeans|| [2] algorithm constitute a better 
 implementation because the improve the initial seeding phase to achieve near 
 optimal clustering. It might be worthwhile to implement kMeans||.
 Resources:
 [1] http://ilpubs.stanford.edu:8090/778/1/2006-13.pdf
 [2] http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread peedeeX21
Github user peedeeX21 commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33465223
  
--- Diff: 
flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/clustering/Clustering.scala
 ---
@@ -0,0 +1,256 @@
+/*
--- End diff --

done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

2015-06-29 Thread gaborhermann
Github user gaborhermann commented on the pull request:

https://github.com/apache/flink/pull/872#issuecomment-116671285
  
I'd prefer the function implementation (like `(K, Int) = Int`), but it 
should stay consistent with the batch API. I don't see why the wrapping would 
effect the compatibility checking of the partitioning.

Is it okay, if I change it to the function implementation in both (Scala 
batch, Scala streaming) APIs? If not, then let's just stick with the 
partitioner implementation in the APIs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605573#comment-14605573
 ] 

ASF GitHub Bot commented on FLINK-1731:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33460529
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * 

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33460529
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * 

[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605585#comment-14605585
 ] 

ASF GitHub Bot commented on FLINK-1731:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33461173
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * 

[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605598#comment-14605598
 ] 

ASF GitHub Bot commented on FLINK-1731:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33462286
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * 

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33462286
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * 

[jira] [Commented] (FLINK-2008) PersistentKafkaSource is sometimes emitting tuples multiple times

2015-06-29 Thread Robert Metzger (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605544#comment-14605544
 ] 

Robert Metzger commented on FLINK-2008:
---

I'm looking into these issues now ...

 PersistentKafkaSource is sometimes emitting tuples multiple times
 -

 Key: FLINK-2008
 URL: https://issues.apache.org/jira/browse/FLINK-2008
 Project: Flink
  Issue Type: Bug
  Components: Kafka Connector, Streaming
Affects Versions: 0.9
Reporter: Robert Metzger
Assignee: Robert Metzger

 The PersistentKafkaSource is expected to emit records exactly once.
 Two test cases of the KafkaITCase are sporadically failing because records 
 are emitted multiple times.
 Affected tests:
 {{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been 
 changed manually in ZK:
 {code}
 java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 
 array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 
 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 
 2]
 {code}
 {{brokerFailureTest()}} also fails:
 {code}
 05/13/2015 08:13:16   Custom source - Stream Sink(1/1) switched to FAILED 
 java.lang.AssertionError: Received tuple with value 21 twice
   at org.junit.Assert.fail(Assert.java:88)
   at org.junit.Assert.assertTrue(Assert.java:41)
   at org.junit.Assert.assertFalse(Assert.java:64)
   at 
 org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877)
   at 
 org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859)
   at 
 org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39)
   at 
 org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
   at 
 org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54)
   at 
 org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39)
   at 
 org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173)
   at 
 org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40)
   at 
 org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137)
   at 
 org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34)
   at 
 org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
   at java.lang.Thread.run(Thread.java:745)
 {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605595#comment-14605595
 ] 

ASF GitHub Bot commented on FLINK-1731:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33462036
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * 

[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605615#comment-14605615
 ] 

ASF GitHub Bot commented on FLINK-1731:
---

Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-116662421
  
Hello I've left some initial comments. Once those have been addressed I'll 
try to do some more integration testing and then pass the review over to a 
commiter.


 Add kMeans clustering algorithm to machine learning library
 ---

 Key: FLINK-1731
 URL: https://issues.apache.org/jira/browse/FLINK-1731
 Project: Flink
  Issue Type: New Feature
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Peter Schrott
  Labels: ML

 The Flink repository already contains a kMeans implementation but it is not 
 yet ported to the machine learning library. I assume that only the used data 
 types have to be adapted and then it can be more or less directly moved to 
 flink-ml.
 The kMeans++ [1] and the kMeans|| [2] algorithm constitute a better 
 implementation because the improve the initial seeding phase to achieve near 
 optimal clustering. It might be worthwhile to implement kMeans||.
 Resources:
 [1] http://ilpubs.stanford.edu:8090/778/1/2006-13.pdf
 [2] http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread thvasilo
Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-116662421
  
Hello I've left some initial comments. Once those have been addressed I'll 
try to do some more integration testing and then pass the review over to a 
commiter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605638#comment-14605638
 ] 

ASF GitHub Bot commented on FLINK-1731:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33466208
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * 

[jira] [Commented] (FLINK-2200) Flink API with Scala 2.11 - Maven Repository

2015-06-29 Thread Chiwan Park (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605502#comment-14605502
 ] 

Chiwan Park commented on FLINK-2200:


Hi, I'm working on this issue. I found there is a problem because lots of 
modules are affected by scala variation. Since flink-runtime and flink-clients 
are affected by scala version variation, Almost all modules of flink have to be 
affected by this changing.

I know we discussed and concluded that only name of affected modules should be 
changed. But I think we have to rethink about this.

 Flink API with Scala 2.11 - Maven Repository
 

 Key: FLINK-2200
 URL: https://issues.apache.org/jira/browse/FLINK-2200
 Project: Flink
  Issue Type: Wish
  Components: Build System, Scala API
Reporter: Philipp Götze
Assignee: Chiwan Park
Priority: Trivial
  Labels: maven

 It would be nice if you could upload a pre-built version of the Flink API 
 with Scala 2.11 to the maven repository.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2289) Make JobManager highly available

2015-06-29 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605534#comment-14605534
 ] 

Till Rohrmann commented on FLINK-2289:
--

Sorry, I didn't see your issue. Will do.

 Make JobManager highly available
 

 Key: FLINK-2289
 URL: https://issues.apache.org/jira/browse/FLINK-2289
 Project: Flink
  Issue Type: Improvement
Reporter: Till Rohrmann
Assignee: Till Rohrmann

 Currently, the {{JobManager}} is the single point of failure in the Flink 
 system. If it fails, then your job cannot be recovered and the Flink cluster 
 is no longer able to receive new jobs.
 Therefore, it is crucial to make the {{JobManager}} fault tolerant so that 
 the Flink cluster can recover from failed {{JobManager}}. As a first step 
 towards this goal, I propose to make the {{JobManager}} highly available by 
 starting multiple instances and using Apache ZooKeeper to elect a leader. The 
 leader is responsible for the execution of the Flink job. 
 In case that the {{JobManager}} dies, one of the other running {{JobManager}} 
 will be elected as the leader and take over the role of the leader. The 
 {{Client}} and the {{TaskManager}} will automatically detect the new 
 {{JobManager}} by querying the ZooKeeper cluster.
 Note that this does not achieve full fault tolerance for the {{JobManager}} 
 but it allows the cluster to recover from failed {{JobManager}}. The design 
 of high-availability for the {{JobManager}} is tracked in the wiki here [1].
 Resources:
 [1] 
 [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2284) Confusing/inconsistent PartitioningStrategy

2015-06-29 Thread Matthias J. Sax (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605353#comment-14605353
 ] 

Matthias J. Sax commented on FLINK-2284:


It is documented in {{org.apache.flink.streaming.api.datastream.DataStream}}. 
And this is the API users see.

 Confusing/inconsistent PartitioningStrategy
 ---

 Key: FLINK-2284
 URL: https://issues.apache.org/jira/browse/FLINK-2284
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.9
Reporter: Stephan Ewen

 The PartitioningStrategy in 
 {{org.apache.flink.streaming.runtime.partitioner.StreamPartitioner.java}} is 
 non standard and not easily understandable.
 What form of partitioning is `SHUFFLE`? Shuffle just means redistribute, it 
 says nothing about what it does. Same with `DISTRIBUTE`. Also `GLOBAL` is not 
 a well-defined/established term. Why is `GROUPBY` a partition type? Doesn't 
 grouping simply hash partition (like I assume SHUFFLE means), so why does 
 it have an extra entry?
 Sticking with principled and established names/concepts is important to allow 
 people to collaborate on the code. 
 Why not stick with the partitioning types defined in the batch API? They are 
 well defined and named:
 ```
 NONE, FORWARD, RANDOM, HASH, RANGE, FORCED_REBALANCE, BROADCAST, CUSTOM
 ```



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2290) CoRecordReader Does Not Read Events From Both Inputs When No Elements Arrive

2015-06-29 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-2290:
---

Assignee: Aljoscha Krettek

 CoRecordReader Does Not Read Events From Both Inputs When No Elements Arrive
 

 Key: FLINK-2290
 URL: https://issues.apache.org/jira/browse/FLINK-2290
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek

 When no elements arrive the reader will always try to read from the same 
 input index. This means that it will only process elements form this input. 
 This could be problematic with Watermarks/Checkpoint barriers.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2291) Use ZooKeeper to elect JobManager leader and send information to TaskManagers

2015-06-29 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-2291:


 Summary: Use ZooKeeper to elect JobManager leader and send 
information to TaskManagers
 Key: FLINK-2291
 URL: https://issues.apache.org/jira/browse/FLINK-2291
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Affects Versions: 0.10
Reporter: Till Rohrmann
Assignee: Till Rohrmann


Use ZooKeeper to determine the leader of a set of {{JobManager}}s which will 
act as the responsible {{JobManager}} for all {{TaskManager}}. The 
{{TaskManager}} will get the address of the leader from ZooKeeper.

Related Wiki: 
[https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2283) Make grouped reduce/fold/aggregations stateful using Partitioned state

2015-06-29 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605357#comment-14605357
 ] 

Gyula Fora commented on FLINK-2283:
---

I would use stateful java maps using PartitionedState for now.

I see several reasons to do this instead of more complex out-of-core 
implementations:

-State will be properly checkpointed with no additional implementation
-We can use the state backend (if necessary) to handle out-of-core state (this 
is probably a perfect candidate for lazy state fetching)
-This implementation will scale easily if we implement it for the partitioned 
state
-It's a trivial implementation, while managed memory will probably be a lot of 
overhead

 Make grouped reduce/fold/aggregations stateful using Partitioned state
 --

 Key: FLINK-2283
 URL: https://issues.apache.org/jira/browse/FLINK-2283
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Gyula Fora
Priority: Minor

 Currently the inner state of the grouped aggregations are not persisted as an 
 operator state. 
 These operators should be reimplemented to use the newly introduced 
 partitioned state abstractions which will make them fault tolerant and 
 scalable for the future.
 A suggested implementation would be to use a stateful mapper to implement the 
 desired behaviour.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2289) Make JobManager highly available

2015-06-29 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-2289.

Resolution: Duplicate

See FLINK-2287

 Make JobManager highly available
 

 Key: FLINK-2289
 URL: https://issues.apache.org/jira/browse/FLINK-2289
 Project: Flink
  Issue Type: Improvement
Reporter: Till Rohrmann
Assignee: Till Rohrmann

 Currently, the {{JobManager}} is the single point of failure in the Flink 
 system. If it fails, then your job cannot be recovered and the Flink cluster 
 is no longer able to receive new jobs.
 Therefore, it is crucial to make the {{JobManager}} fault tolerant so that 
 the Flink cluster can recover from failed {{JobManager}}. As a first step 
 towards this goal, I propose to make the {{JobManager}} highly available by 
 starting multiple instances and using Apache ZooKeeper to elect a leader. The 
 leader is responsible for the execution of the Flink job. 
 In case that the {{JobManager}} dies, one of the other running {{JobManager}} 
 will be elected as the leader and take over the role of the leader. The 
 {{Client}} and the {{TaskManager}} will automatically detect the new 
 {{JobManager}} by querying the ZooKeeper cluster.
 Note that this does not achieve full fault tolerance for the {{JobManager}} 
 but it allows the cluster to recover from failed {{JobManager}}. The design 
 of high-availability for the {{JobManager}} is tracked in the wiki here [1].
 Resources:
 [1] 
 [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2285) Active policy emits elements of the last window twice

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605340#comment-14605340
 ] 

ASF GitHub Bot commented on FLINK-2285:
---

GitHub user mbalassi opened a pull request:

https://github.com/apache/flink/pull/873

[FLINK-2285] [streaming] Removed duplicate call in close from 
GroupedActiveDiscretizer



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mbalassi/flink flink-2285

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/873.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #873


commit 1065a2893afd2c342ccc8949928273862426162c
Author: mbalassi mbala...@apache.org
Date:   2015-06-29T09:17:21Z

[FLINK-2285] [streaming] Removed duplicate call in close from 
GroupedActiveDiscretizer

commit e0bd71a2589ccd7e90c6ca657587749fcd56d766
Author: mbalassi mbala...@apache.org
Date:   2015-06-29T09:21:23Z

[streaming] Minor streaming code cleanups




 Active policy emits elements of the last window twice
 -

 Key: FLINK-2285
 URL: https://issues.apache.org/jira/browse/FLINK-2285
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.9, 0.10
Reporter: Márton Balassi
Assignee: Márton Balassi

 The root cause is some duplicate code between the close methods of the 
 GroupedActiveDiscretizer and the GroupedStreamDiscretizer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2285] [streaming] Removed duplicate cal...

2015-06-29 Thread mbalassi
GitHub user mbalassi opened a pull request:

https://github.com/apache/flink/pull/873

[FLINK-2285] [streaming] Removed duplicate call in close from 
GroupedActiveDiscretizer



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mbalassi/flink flink-2285

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/873.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #873


commit 1065a2893afd2c342ccc8949928273862426162c
Author: mbalassi mbala...@apache.org
Date:   2015-06-29T09:17:21Z

[FLINK-2285] [streaming] Removed duplicate call in close from 
GroupedActiveDiscretizer

commit e0bd71a2589ccd7e90c6ca657587749fcd56d766
Author: mbalassi mbala...@apache.org
Date:   2015-06-29T09:21:23Z

[streaming] Minor streaming code cleanups




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-2291) Use ZooKeeper to elect JobManager leader and send information to TaskManagers

2015-06-29 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-2291:
-
Description: 
Use ZooKeeper to determine the leader of a set of {{JobManager}} s which will 
act as the responsible {{JobManager}} for all {{TaskManager}}. The 
{{TaskManager}} will get the address of the leader from ZooKeeper.

Related Wiki: 
[https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability]

  was:
Use ZooKeeper to determine the leader of a set of {{JobManager}}s which will 
act as the responsible {{JobManager}} for all {{TaskManager}}. The 
{{TaskManager}} will get the address of the leader from ZooKeeper.

Related Wiki: 
[https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability]


 Use ZooKeeper to elect JobManager leader and send information to TaskManagers
 -

 Key: FLINK-2291
 URL: https://issues.apache.org/jira/browse/FLINK-2291
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Affects Versions: 0.10
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 0.10


 Use ZooKeeper to determine the leader of a set of {{JobManager}} s which will 
 act as the responsible {{JobManager}} for all {{TaskManager}}. The 
 {{TaskManager}} will get the address of the leader from ZooKeeper.
 Related Wiki: 
 [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (FLINK-2291) Use ZooKeeper to elect JobManager leader and send information to TaskManagers

2015-06-29 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-2291:
-
Description: 
Use ZooKeeper to determine the leader of a set of {{JobManagers}} which will 
act as the responsible {{JobManager}} for all {{TaskManager}}. The 
{{TaskManager}} will get the address of the leader from ZooKeeper.

Related Wiki: 
[https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability]

  was:
Use ZooKeeper to determine the leader of a set of {{JobManager}} s which will 
act as the responsible {{JobManager}} for all {{TaskManager}}. The 
{{TaskManager}} will get the address of the leader from ZooKeeper.

Related Wiki: 
[https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability]


 Use ZooKeeper to elect JobManager leader and send information to TaskManagers
 -

 Key: FLINK-2291
 URL: https://issues.apache.org/jira/browse/FLINK-2291
 Project: Flink
  Issue Type: Sub-task
  Components: JobManager, TaskManager
Affects Versions: 0.10
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 0.10


 Use ZooKeeper to determine the leader of a set of {{JobManagers}} which will 
 act as the responsible {{JobManager}} for all {{TaskManager}}. The 
 {{TaskManager}} will get the address of the leader from ZooKeeper.
 Related Wiki: 
 [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread peedeeX21
Github user peedeeX21 commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33469076
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * 

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread peedeeX21
Github user peedeeX21 commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33476507
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * 

[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

2015-06-29 Thread gaborhermann
Github user gaborhermann commented on the pull request:

https://github.com/apache/flink/pull/872#issuecomment-116736041
  
Sorry for not making myself clear.

I would actually go for
4. Only the Scala function (both in the streaming and batch API)

I don't understand how changing from partitioner implementation to function 
implementation in the batch API would mess up determining the compatibility of 
the partitioning. By compatibility I mean the type of the key must be the same 
as the input of the partitioner.

I suppose there was another reason (that I do not understand) for choosing 
the partitioner implementation for the Scala batch API, so if (4) is not an 
option, I would go for (2) (only partitioner, sync with batch API).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605726#comment-14605726
 ] 

ASF GitHub Bot commented on FLINK-1731:
---

Github user peedeeX21 commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33471983
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * 

[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605752#comment-14605752
 ] 

ASF GitHub Bot commented on FLINK-1731:
---

Github user peedeeX21 commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33475298
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * 

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread peedeeX21
Github user peedeeX21 commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33475298
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * 

[jira] [Commented] (FLINK-2138) PartitionCustom for streaming

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605774#comment-14605774
 ] 

ASF GitHub Bot commented on FLINK-2138:
---

Github user gaborhermann commented on the pull request:

https://github.com/apache/flink/pull/872#issuecomment-116736041
  
Sorry for not making myself clear.

I would actually go for
4. Only the Scala function (both in the streaming and batch API)

I don't understand how changing from partitioner implementation to function 
implementation in the batch API would mess up determining the compatibility of 
the partitioning. By compatibility I mean the type of the key must be the same 
as the input of the partitioner.

I suppose there was another reason (that I do not understand) for choosing 
the partitioner implementation for the Scala batch API, so if (4) is not an 
option, I would go for (2) (only partitioner, sync with batch API).


 PartitionCustom for streaming
 -

 Key: FLINK-2138
 URL: https://issues.apache.org/jira/browse/FLINK-2138
 Project: Flink
  Issue Type: New Feature
  Components: Streaming
Affects Versions: 0.9
Reporter: Márton Balassi
Assignee: Gábor Hermann
Priority: Minor

 The batch API has support for custom partitioning, this should be added for 
 streaming with a similar signature.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2138) PartitionCustom for streaming

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605711#comment-14605711
 ] 

ASF GitHub Bot commented on FLINK-2138:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/872#issuecomment-116710090
  
I am confused now, what is it going to be?
  1. Overloading, such that it is Scala function and Partitioner, at the 
cost of redundant APIs.
  2. Only partitioner (sync with batch API)
  3. Only Scala function (break with batch API)

I am not a big fan of (1), as these redundant options are confusing 
blow-ups of the APIs.


 PartitionCustom for streaming
 -

 Key: FLINK-2138
 URL: https://issues.apache.org/jira/browse/FLINK-2138
 Project: Flink
  Issue Type: New Feature
  Components: Streaming
Affects Versions: 0.9
Reporter: Márton Balassi
Assignee: Gábor Hermann
Priority: Minor

 The batch API has support for custom partitioning, this should be added for 
 streaming with a similar signature.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

2015-06-29 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/872#issuecomment-116710090
  
I am confused now, what is it going to be?
  1. Overloading, such that it is Scala function and Partitioner, at the 
cost of redundant APIs.
  2. Only partitioner (sync with batch API)
  3. Only Scala function (break with batch API)

I am not a big fan of (1), as these redundant options are confusing 
blow-ups of the APIs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread peedeeX21
Github user peedeeX21 commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33471983
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * 

[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread peedeeX21
Github user peedeeX21 commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33475321
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * Defines the initial k centroids of the k clusters. They are used as 
start off point of the
+ * algorithm for clustering the data set. The centroids are recalculated 
as often as set in
+ * 

[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605753#comment-14605753
 ] 

ASF GitHub Bot commented on FLINK-1731:
---

Github user peedeeX21 commented on a diff in the pull request:

https://github.com/apache/flink/pull/700#discussion_r33475321
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala
 ---
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.clustering
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import 
org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields
+import org.apache.flink.api.scala.{DataSet, _}
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.{LabeledVector, _}
+import org.apache.flink.ml.math.Breeze._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric
+import org.apache.flink.ml.pipeline._
+
+import scala.collection.JavaConverters._
+
+
+/**
+ * Implements the KMeans algorithm which calculates cluster centroids 
based on set of training data
+ * points and a set of k initial centroids.
+ *
+ * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of 
data points and can then be
+ * used to assign new points to the learned cluster centroids.
+ *
+ * The KMeans algorithm works as described on Wikipedia
+ * (http://en.wikipedia.org/wiki/K-means_clustering):
+ *
+ * Given an initial set of k means m1(1),…,mk(1) (see below), the 
algorithm proceeds by alternating
+ * between two steps:
+ *
+ * ===Assignment step:===
+ *
+ * Assign each observation to the cluster whose mean yields the least 
within-cluster sum  of
+ * squares (WCSS). Since the sum of squares is the squared Euclidean 
distance, this is intuitively
+ * the nearest mean. (Mathematically, this means partitioning the 
observations according to the
+ * Voronoi diagram generated by the means).
+ *
+ * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 
\forall j, 1 ≤ j ≤ k}`,
+ * where each `x_p`  is assigned to exactly one `S^{(t)}`, even if it 
could be assigned to two or
+ * more of them.
+ *
+ * ===Update step:===
+ *
+ * Calculate the new means to be the centroids of the observations in the 
new clusters.
+ *
+ * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j`
+ *
+ * Since the arithmetic mean is a least-squares estimator, this also 
minimizes the within-cluster
+ * sum of squares (WCSS) objective.
+ *
+ * @example
+ * {{{
+ *  val trainingDS: DataSet[Vector] = 
env.fromCollection(Clustering.trainingData)
+ *  val initialCentroids: DataSet[LabledVector] = 
env.fromCollection(Clustering.initCentroids)
+ *
+ *  val kmeans = KMeans()
+ *.setInitialCentroids(initialCentroids)
+ *.setNumIterations(10)
+ *
+ *  kmeans.fit(trainingDS)
+ *
+ *  // getting the computed centroids
+ *  val centroidsResult = kmeans.centroids.get.collect()
+ *
+ *  // get matching clusters for new points
+ *  val testDS: DataSet[Vector] = 
env.fromCollection(Clustering.testData)
+ *  val clusters: DataSet[LabeledVector] = kmeans.predict(testDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]:
+ * Defines the number of iterations to recalculate the centroids of the 
clusters. As it
+ * is a heuristic algorithm, there is no guarantee that it will converge 
to the global optimum. The
+ * centroids of the clusters and the reassignment of the data points will 
be repeated till the
+ * given number of iterations is reached.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]:
+ * 

[jira] [Commented] (FLINK-2138) PartitionCustom for streaming

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605818#comment-14605818
 ] 

ASF GitHub Bot commented on FLINK-2138:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/872#issuecomment-116745707
  
The partitioner function in Scala was simply added as a mirror of the Java 
API.

The batch API is stable, that means at most we can add a Scala function and 
deprecate the partitioner.


 PartitionCustom for streaming
 -

 Key: FLINK-2138
 URL: https://issues.apache.org/jira/browse/FLINK-2138
 Project: Flink
  Issue Type: New Feature
  Components: Streaming
Affects Versions: 0.9
Reporter: Márton Balassi
Assignee: Gábor Hermann
Priority: Minor

 The batch API has support for custom partitioning, this should be added for 
 streaming with a similar signature.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

2015-06-29 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/872#issuecomment-116745707
  
The partitioner function in Scala was simply added as a mirror of the Java 
API.

The batch API is stable, that means at most we can add a Scala function and 
deprecate the partitioner.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2138) PartitionCustom for streaming

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605880#comment-14605880
 ] 

ASF GitHub Bot commented on FLINK-2138:
---

Github user gaborhermann commented on the pull request:

https://github.com/apache/flink/pull/872#issuecomment-116755999
  
Okay, then I will
* deprecate the partitioner implementation in the batch API
* add the function implementation to the batch API
* add the function implementation to the streaming API and remove the 
partitioner implementation (so streaming will only have function 
implementation). As this PR is not merged yet we do not break the streaming API.

Is it okay?
I guess it's worth it. This way Scala users will be able to write more 
concise code and they will not get confused by the overloaded functions because 
the ones with the partitioner will be deprecated.


 PartitionCustom for streaming
 -

 Key: FLINK-2138
 URL: https://issues.apache.org/jira/browse/FLINK-2138
 Project: Flink
  Issue Type: New Feature
  Components: Streaming
Affects Versions: 0.9
Reporter: Márton Balassi
Assignee: Gábor Hermann
Priority: Minor

 The batch API has support for custom partitioning, this should be added for 
 streaming with a similar signature.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...

2015-06-29 Thread gaborhermann
Github user gaborhermann commented on the pull request:

https://github.com/apache/flink/pull/872#issuecomment-116755999
  
Okay, then I will
* deprecate the partitioner implementation in the batch API
* add the function implementation to the batch API
* add the function implementation to the streaming API and remove the 
partitioner implementation (so streaming will only have function 
implementation). As this PR is not merged yet we do not break the streaming API.

Is it okay?
I guess it's worth it. This way Scala users will be able to write more 
concise code and they will not get confused by the overloaded functions because 
the ones with the partitioner will be deprecated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2282) Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1

2015-06-29 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605335#comment-14605335
 ] 

Gyula Fora commented on FLINK-2282:
---

Okay

 Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1
 ---

 Key: FLINK-2282
 URL: https://issues.apache.org/jira/browse/FLINK-2282
 Project: Flink
  Issue Type: Task
  Components: Streaming
Affects Versions: 0.9.1
Reporter: Gyula Fora

 Non-grouped reduces/fold/aggregations have been removed for DataStream in 
 0.10 snapshot, so these features should be deprecated for the current 
 release, and some note should be added to the javadocs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2282) Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1

2015-06-29 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-2282.
-
Resolution: Not A Problem

 Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1
 ---

 Key: FLINK-2282
 URL: https://issues.apache.org/jira/browse/FLINK-2282
 Project: Flink
  Issue Type: Task
  Components: Streaming
Affects Versions: 0.9.1
Reporter: Gyula Fora

 Non-grouped reduces/fold/aggregations have been removed for DataStream in 
 0.10 snapshot, so these features should be deprecated for the current 
 release, and some note should be added to the javadocs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...

2015-06-29 Thread peedeeX21
Github user peedeeX21 commented on the pull request:

https://github.com/apache/flink/pull/700#issuecomment-116850459
  
I am having some trouble to fit our predictor into the new API. 
The problem is, that with `PredictOperation` the type of the model has to 
be defined. A `DataSet` of this type is the output of the `getModel`. For the 
`predict` method the input is just an object of this type.

In our case our model is a `DataSet` of `LabeledVectors` (the centroids). 
This means I can not implement a `PredictOperation` due to that restriction.

For me the API feels a bit inconsistent in that case 

For now I implemented only an `PredictDataSetOperation`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Updated] (FLINK-2066) Make delay between execution retries configurable

2015-06-29 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-2066:
-
Assignee: Nuno Miguel Marques dos Santos

 Make delay between execution retries configurable
 -

 Key: FLINK-2066
 URL: https://issues.apache.org/jira/browse/FLINK-2066
 Project: Flink
  Issue Type: Improvement
  Components: Core
Affects Versions: 0.9
Reporter: Stephan Ewen
Assignee: Nuno Miguel Marques dos Santos
  Labels: starter

 Flink allows to specify a delay between execution retries. This helps to let 
 some external failure causes fully manifest themselves before the restart is 
 attempted.
 The delay is currently defined only system wide.
 We should add it to the {{ExecutionConfig}} of a job to allow per-job 
 specification.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2282) Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1

2015-06-29 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605297#comment-14605297
 ] 

Stephan Ewen commented on FLINK-2282:
-

Minor versions are bug-fix versions, major versions are feature versions. 
Deprecating in minor releases violates semantic versioning (http://semver.org/)

The code should have been deprecated in 0.10.0 and removed for 0.11.0 to follow 
proper standards.

 Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1
 ---

 Key: FLINK-2282
 URL: https://issues.apache.org/jira/browse/FLINK-2282
 Project: Flink
  Issue Type: Task
  Components: Streaming
Affects Versions: 0.9.1
Reporter: Gyula Fora

 Non-grouped reduces/fold/aggregations have been removed for DataStream in 
 0.10 snapshot, so these features should be deprecated for the current 
 release, and some note should be added to the javadocs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2289) Make JobManager highly available

2015-06-29 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-2289:


 Summary: Make JobManager highly available
 Key: FLINK-2289
 URL: https://issues.apache.org/jira/browse/FLINK-2289
 Project: Flink
  Issue Type: Improvement
Reporter: Till Rohrmann
Assignee: Till Rohrmann


Currently, the {{JobManager}} is the single point of failure in the Flink 
system. If it fails, then your job cannot be recovered and the Flink cluster is 
no longer able to receive new jobs.

Therefore, it is crucial to make the {{JobManager}} fault tolerant so that the 
Flink cluster can recover from failed {{JobManager}}. As a first step towards 
this goal, I propose to make the {{JobManager}} highly available by starting 
multiple instances and using Apache ZooKeeper to elect a leader. The leader is 
responsible for the execution of the Flink job. 

In case that the {{JobManager}} dies, one of the other running {{JobManager}} 
will be elected as the leader and take over the role of the leader. The 
{{Client}} and the {{TaskManager}} will automatically detect the new 
{{JobManager}} by querying the ZooKeeper cluster.

Note that this does not achieve full fault tolerance for the {{JobManager}} but 
it allows the cluster to recover from failed {{JobManager}}. The design of 
high-availability for the {{JobManager}} is tracked in the wiki here [1].

Resources:
[1] 
[https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2283) Make grouped reduce/fold/aggregations stateful using Partitioned state

2015-06-29 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605298#comment-14605298
 ] 

Stephan Ewen commented on FLINK-2283:
-

Are these again implemented in java maps, or are there any considerations about 
out-of-core and managed memory here?

 Make grouped reduce/fold/aggregations stateful using Partitioned state
 --

 Key: FLINK-2283
 URL: https://issues.apache.org/jira/browse/FLINK-2283
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Affects Versions: 0.10
Reporter: Gyula Fora
Priority: Minor

 Currently the inner state of the grouped aggregations are not persisted as an 
 operator state. 
 These operators should be reimplemented to use the newly introduced 
 partitioned state abstractions which will make them fault tolerant and 
 scalable for the future.
 A suggested implementation would be to use a stateful mapper to implement the 
 desired behaviour.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2282) Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1

2015-06-29 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605330#comment-14605330
 ] 

Stephan Ewen commented on FLINK-2282:
-

Since the API is labeled beta, it is okay to do breaking changes.

 Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1
 ---

 Key: FLINK-2282
 URL: https://issues.apache.org/jira/browse/FLINK-2282
 Project: Flink
  Issue Type: Task
  Components: Streaming
Affects Versions: 0.9.1
Reporter: Gyula Fora

 Non-grouped reduces/fold/aggregations have been removed for DataStream in 
 0.10 snapshot, so these features should be deprecated for the current 
 release, and some note should be added to the javadocs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2284) Confusing/inconsistent PartitioningStrategy

2015-06-29 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-2284:
---

 Summary: Confusing/inconsistent PartitioningStrategy
 Key: FLINK-2284
 URL: https://issues.apache.org/jira/browse/FLINK-2284
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.9
Reporter: Stephan Ewen


The PartitioningStrategy in 
{{org.apache.flink.streaming.runtime.partitioner.StreamPartitioner.java}} is 
non standard and not easily understandable.

What form of partitioning is `SHUFFLE`? Shuffle just means redistribute, it 
says nothing about what it does. Same with `DISTRIBUTE`. Also `GLOBAL` is not a 
well-defined/established term. Why is `GROUPBY` a partition type? Doesn't 
grouping simply hash partition (like I assume SHUFFLE means), so why does it 
have an extra entry?

Sticking with principled and established names/concepts is important to allow 
people to collaborate on the code. 

Why not stick with the partitioning types defined in the batch API? They are 
well defined and named:
```
NONE, FORWARD, RANDOM, HASH, RANGE, FORCED_REBALANCE, BROADCAST, CUSTOM
```




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (FLINK-2282) Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1

2015-06-29 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605297#comment-14605297
 ] 

Stephan Ewen edited comment on FLINK-2282 at 6/29/15 8:48 AM:
--

Minor versions are bug-fix versions, major versions are feature versions. 
Deprecating in minor releases violates semantic versioning (http://semver.org/)

The code should have been deprecated in 0.10.0 and removed for 0.11.0 to follow 
proper standards.

(UPDATE: I assume that 0.9, 0.10, ... are actually the equivalent of major 
versions here.)


was (Author: stephanewen):
Minor versions are bug-fix versions, major versions are feature versions. 
Deprecating in minor releases violates semantic versioning (http://semver.org/)

The code should have been deprecated in 0.10.0 and removed for 0.11.0 to follow 
proper standards.

 Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1
 ---

 Key: FLINK-2282
 URL: https://issues.apache.org/jira/browse/FLINK-2282
 Project: Flink
  Issue Type: Task
  Components: Streaming
Affects Versions: 0.9.1
Reporter: Gyula Fora

 Non-grouped reduces/fold/aggregations have been removed for DataStream in 
 0.10 snapshot, so these features should be deprecated for the current 
 release, and some note should be added to the javadocs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2286) Window ParallelMerge sometimes swallows elements of the last window

2015-06-29 Thread JIRA
Márton Balassi created FLINK-2286:
-

 Summary: Window ParallelMerge sometimes swallows elements of the 
last window
 Key: FLINK-2286
 URL: https://issues.apache.org/jira/browse/FLINK-2286
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.9, 0.10
Reporter: Márton Balassi
Assignee: Márton Balassi


Last windows in the stream that do not have parts at all the parallel operator 
instances get swallowed by the ParallelMerge.

To resolve this ParallelMerge should be an operator instead of a function, so 
the close method can access the collector and emit these.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2289) Make JobManager highly available

2015-06-29 Thread Ufuk Celebi (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605308#comment-14605308
 ] 

Ufuk Celebi commented on FLINK-2289:


Issue creation race... ;) Can you copy your text to FLINK-2287?

 Make JobManager highly available
 

 Key: FLINK-2289
 URL: https://issues.apache.org/jira/browse/FLINK-2289
 Project: Flink
  Issue Type: Improvement
Reporter: Till Rohrmann
Assignee: Till Rohrmann

 Currently, the {{JobManager}} is the single point of failure in the Flink 
 system. If it fails, then your job cannot be recovered and the Flink cluster 
 is no longer able to receive new jobs.
 Therefore, it is crucial to make the {{JobManager}} fault tolerant so that 
 the Flink cluster can recover from failed {{JobManager}}. As a first step 
 towards this goal, I propose to make the {{JobManager}} highly available by 
 starting multiple instances and using Apache ZooKeeper to elect a leader. The 
 leader is responsible for the execution of the Flink job. 
 In case that the {{JobManager}} dies, one of the other running {{JobManager}} 
 will be elected as the leader and take over the role of the leader. The 
 {{Client}} and the {{TaskManager}} will automatically detect the new 
 {{JobManager}} by querying the ZooKeeper cluster.
 Note that this does not achieve full fault tolerance for the {{JobManager}} 
 but it allows the cluster to recover from failed {{JobManager}}. The design 
 of high-availability for the {{JobManager}} is tracked in the wiki here [1].
 Resources:
 [1] 
 [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (FLINK-2285) Active policy emits elements of the last window twice

2015-06-29 Thread JIRA

 [ 
https://issues.apache.org/jira/browse/FLINK-2285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Márton Balassi reassigned FLINK-2285:
-

Assignee: Márton Balassi

 Active policy emits elements of the last window twice
 -

 Key: FLINK-2285
 URL: https://issues.apache.org/jira/browse/FLINK-2285
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.9, 0.10
Reporter: Márton Balassi
Assignee: Márton Balassi

 The root cause is some duplicate code between the close methods of the 
 GroupedActiveDiscretizer and the GroupedStreamDiscretizer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2285) Active policy emits elements of the last window twice

2015-06-29 Thread JIRA
Márton Balassi created FLINK-2285:
-

 Summary: Active policy emits elements of the last window twice
 Key: FLINK-2285
 URL: https://issues.apache.org/jira/browse/FLINK-2285
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.9, 0.10
Reporter: Márton Balassi


The root cause is some duplicate code between the close methods of the 
GroupedActiveDiscretizer and the GroupedStreamDiscretizer.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2284) Confusing/inconsistent PartitioningStrategy

2015-06-29 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605312#comment-14605312
 ] 

Gyula Fora commented on FLINK-2284:
---

We took these names from Storm which we saw at that point a standard for 
streaming concepts.

 Confusing/inconsistent PartitioningStrategy
 ---

 Key: FLINK-2284
 URL: https://issues.apache.org/jira/browse/FLINK-2284
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.9
Reporter: Stephan Ewen

 The PartitioningStrategy in 
 {{org.apache.flink.streaming.runtime.partitioner.StreamPartitioner.java}} is 
 non standard and not easily understandable.
 What form of partitioning is `SHUFFLE`? Shuffle just means redistribute, it 
 says nothing about what it does. Same with `DISTRIBUTE`. Also `GLOBAL` is not 
 a well-defined/established term. Why is `GROUPBY` a partition type? Doesn't 
 grouping simply hash partition (like I assume SHUFFLE means), so why does 
 it have an extra entry?
 Sticking with principled and established names/concepts is important to allow 
 people to collaborate on the code. 
 Why not stick with the partitioning types defined in the batch API? They are 
 well defined and named:
 ```
 NONE, FORWARD, RANDOM, HASH, RANGE, FORCED_REBALANCE, BROADCAST, CUSTOM
 ```



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-1735] Feature Hasher

2015-06-29 Thread ChristophAl
Github user ChristophAl commented on the pull request:

https://github.com/apache/flink/pull/665#issuecomment-116517528
  
Hi,

after I rebased it on master and implemented the new pipeline interface, I 
have some followup questions regarding the types we should accept for feature 
hashing.

I can think of Iterable[String] and Iterable[(String, Int)] for documents 
as well as (Int, Iterable[String]) and (Int, Iterable[(String, Int)]) for 
documents having some kind of index. So I'm not sure if it is required to hash 
arbitrary types by using .hashcode()?
Also note, in case the nonNegative parameter is set to true, the output can 
be used as TF in TF-IDF.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2287) Implement JobManager high availability

2015-06-29 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-2287:
--

 Summary: Implement JobManager high availability
 Key: FLINK-2287
 URL: https://issues.apache.org/jira/browse/FLINK-2287
 Project: Flink
  Issue Type: Improvement
  Components: JobManager, TaskManager
Reporter: Ufuk Celebi
 Fix For: 0.10


The problem: The JobManager (JM) is a single point of failure. When it crashes, 
TaskManagers (TM) fail all running jobs and try to reconnect to the same JM. A 
failed JM looses all state and can not resume the running jobs; even if it 
recovers and the TMs reconnect.

Solution: implement JM fault tolerance/high availability by having multiple JM 
instances running with one as leader and the other(s) in standby. The exact 
coordination and state update protocol between JM, TM, and clients is covered 
in sub-tasks/issues.

Related Wiki: 
https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2230] handling null values for TupleSer...

2015-06-29 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/867#issuecomment-116520207
  
@Shiti No need to apologize, that is all right. We are discussing a tricky 
thing here...

It is true, right now, one needs to be aware of missing values in the 
application logic. I personally like that, an `Option` makes it very explicit 
and easy to grasp. Also the byte overhead for the option field is okay, as most 
fields will not be nullable and not carry this overhead. The serialization 
logic is also more CPU efficient that way.

What is true is that there is a problem when converting from a `Table` to a 
`DataSet[Tuple]`. Tables with nullable fields could only be converted to data 
sets of class types, which would support nullable fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2138) PartitionCustom for streaming

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605296#comment-14605296
 ] 

ASF GitHub Bot commented on FLINK-2138:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/872#issuecomment-116528738
  
I actually like this approach. We had the same discussion for the batch API 
and resolved to this, because:

  - You can always chain a `FlatMapFunction` with a `partitionCustom()` 
request to solve all the above situations.
  - This interface allows easy Java8-lambda implementation and it works 
well with the type extraction.
  - It seems to cover the majority of cases more elegantly, as there is no 
need for array wrapping in the user code.


 PartitionCustom for streaming
 -

 Key: FLINK-2138
 URL: https://issues.apache.org/jira/browse/FLINK-2138
 Project: Flink
  Issue Type: New Feature
  Components: Streaming
Affects Versions: 0.9
Reporter: Márton Balassi
Assignee: Gábor Hermann
Priority: Minor

 The batch API has support for custom partitioning, this should be added for 
 streaming with a similar signature.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2281) Allow chaining of operators with multiple inputs

2015-06-29 Thread Gyula Fora (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gyula Fora closed FLINK-2281.
-
Resolution: Later

 Allow chaining of operators with multiple inputs
 

 Key: FLINK-2281
 URL: https://issues.apache.org/jira/browse/FLINK-2281
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Gyula Fora
Assignee: Gyula Fora
Priority: Minor

 Currently only operators with one input can be chained for better 
 performance. In principle there is no reason why this should not be extended 
 to multiple input operators (where chaining is applicable, i.e forward 
 connected), to allow maximal speedup.
 The concern usually arises that it might not be good to always chain 
 everything in case of computation heavy operators. In these cases (if the job 
 is otherwise viable) chaining only increases latency for some operations 
 while the throughput is increased exploiting by increasing the parallelism to 
 the freed resources.
 In any case chaining should still remain tunable on an operator level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2281) Allow chaining of operators with multiple inputs

2015-06-29 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605323#comment-14605323
 ] 

Gyula Fora commented on FLINK-2281:
---

After a second thought, this will introduce a very large complexity to the 
record readers and also to the FT implementation, I am closing this issue for 
now.

 Allow chaining of operators with multiple inputs
 

 Key: FLINK-2281
 URL: https://issues.apache.org/jira/browse/FLINK-2281
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Gyula Fora
Assignee: Gyula Fora
Priority: Minor

 Currently only operators with one input can be chained for better 
 performance. In principle there is no reason why this should not be extended 
 to multiple input operators (where chaining is applicable, i.e forward 
 connected), to allow maximal speedup.
 The concern usually arises that it might not be good to always chain 
 everything in case of computation heavy operators. In these cases (if the job 
 is otherwise viable) chaining only increases latency for some operations 
 while the throughput is increased exploiting by increasing the parallelism to 
 the freed resources.
 In any case chaining should still remain tunable on an operator level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2281) Allow chaining of operators with multiple inputs

2015-06-29 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605261#comment-14605261
 ] 

Stephan Ewen commented on FLINK-2281:
-

Nice idea, but probably not very critical.

I would assume that most real-world connected inputs involve some repartition 
patterns anyways.

 Allow chaining of operators with multiple inputs
 

 Key: FLINK-2281
 URL: https://issues.apache.org/jira/browse/FLINK-2281
 Project: Flink
  Issue Type: Improvement
  Components: Streaming
Reporter: Gyula Fora
Assignee: Gyula Fora
Priority: Minor

 Currently only operators with one input can be chained for better 
 performance. In principle there is no reason why this should not be extended 
 to multiple input operators (where chaining is applicable, i.e forward 
 connected), to allow maximal speedup.
 The concern usually arises that it might not be good to always chain 
 everything in case of computation heavy operators. In these cases (if the job 
 is otherwise viable) chaining only increases latency for some operations 
 while the throughput is increased exploiting by increasing the parallelism to 
 the freed resources.
 In any case chaining should still remain tunable on an operator level.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1735) Add FeatureHasher to machine learning library

2015-06-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605260#comment-14605260
 ] 

ASF GitHub Bot commented on FLINK-1735:
---

Github user ChristophAl commented on the pull request:

https://github.com/apache/flink/pull/665#issuecomment-116517528
  
Hi,

after I rebased it on master and implemented the new pipeline interface, I 
have some followup questions regarding the types we should accept for feature 
hashing.

I can think of Iterable[String] and Iterable[(String, Int)] for documents 
as well as (Int, Iterable[String]) and (Int, Iterable[(String, Int)]) for 
documents having some kind of index. So I'm not sure if it is required to hash 
arbitrary types by using .hashcode()?
Also note, in case the nonNegative parameter is set to true, the output can 
be used as TF in TF-IDF.


 Add FeatureHasher to machine learning library
 -

 Key: FLINK-1735
 URL: https://issues.apache.org/jira/browse/FLINK-1735
 Project: Flink
  Issue Type: New Feature
  Components: Machine Learning Library
Reporter: Till Rohrmann
Assignee: Felix Neutatz
  Labels: ML

 Using the hashing trick [1,2] is a common way to vectorize arbitrary feature 
 values. The hash of the feature value is used to calculate its index for a 
 vector entry. In order to mitigate possible collisions, a second hashing 
 function is used to calculate the sign for the update value which is added to 
 the vector entry. This way, it is likely that collision will simply cancel 
 out.
 A feature hasher would also be helpful for NLP problems where it could be 
 used to vectorize bag of words or ngrams feature vectors.
 Resources:
 [1] [https://en.wikipedia.org/wiki/Feature_hashing]
 [2] 
 [http://scikit-learn.org/stable/modules/feature_extraction.html#feature-extraction]



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2288) Setup ZooKeeper for distributed coordination

2015-06-29 Thread Ufuk Celebi (JIRA)
Ufuk Celebi created FLINK-2288:
--

 Summary: Setup ZooKeeper for distributed coordination
 Key: FLINK-2288
 URL: https://issues.apache.org/jira/browse/FLINK-2288
 Project: Flink
  Issue Type: Sub-task
Reporter: Ufuk Celebi
Assignee: Ufuk Celebi
 Fix For: 0.10


Having standby JM instances for job manager high availabilty requires 
distributed coordination between JM, TM, and clients. For this, we will use 
ZooKeeper (ZK).

Pros:
- Proven solution (other projects use it for this as well)
- Apache TLP with large community, docs, and library with required recipies 
like leader election (see below)

Related Wiki: 
https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2282) Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1

2015-06-29 Thread Gyula Fora (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605319#comment-14605319
 ] 

Gyula Fora commented on FLINK-2282:
---

You are right we should have thought about this before the release.

I don't mind just removing them completely without notice, I was just 
suggesting that maybe it is now better to deprecate them.

 Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1
 ---

 Key: FLINK-2282
 URL: https://issues.apache.org/jira/browse/FLINK-2282
 Project: Flink
  Issue Type: Task
  Components: Streaming
Affects Versions: 0.9.1
Reporter: Gyula Fora

 Non-grouped reduces/fold/aggregations have been removed for DataStream in 
 0.10 snapshot, so these features should be deprecated for the current 
 release, and some note should be added to the javadocs.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)