[jira] [Commented] (FLINK-2283) Make grouped reduce/fold/aggregations stateful using Partitioned state
[ https://issues.apache.org/jira/browse/FLINK-2283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605416#comment-14605416 ] Stephan Ewen commented on FLINK-2283: - This would be a temporary solution that will most likely be touched again fairly soon. It may make sense to do this now, if the implementation is easy and lightweight such that it does not hurt if the code becomes obsolete in a bit... Make grouped reduce/fold/aggregations stateful using Partitioned state -- Key: FLINK-2283 URL: https://issues.apache.org/jira/browse/FLINK-2283 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Gyula Fora Priority: Minor Currently the inner state of the grouped aggregations are not persisted as an operator state. These operators should be reimplemented to use the newly introduced partitioned state abstractions which will make them fault tolerant and scalable for the future. A suggested implementation would be to use a stateful mapper to implement the desired behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...
Github user gaborhermann commented on the pull request: https://github.com/apache/flink/pull/872#issuecomment-116647912 By the way, in the Scala DataSet the user should specify the Java `Partitioner[K]` class. Wouldn't it be more convenient to wrap a function like `(K, Int) = Int` into a `Partitioner[K]` similarly to the KeySelector? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605571#comment-14605571 ] ASF GitHub Bot commented on FLINK-1731: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33460336 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),…,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 \forall j, 1 ≤ j ≤ k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + *
[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33462036 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k centroids of the k clusters. They are used as start off point of the + * algorithm for clustering the data set. The centroids are recalculated as often as set in + *
[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605612#comment-14605612 ] ASF GitHub Bot commented on FLINK-1731: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33463484 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/clustering/Clustering.scala --- @@ -0,0 +1,256 @@ +/* --- End diff -- Rename to ClusteringData.scala Add kMeans clustering algorithm to machine learning library --- Key: FLINK-1731 URL: https://issues.apache.org/jira/browse/FLINK-1731 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Peter Schrott Labels: ML The Flink repository already contains a kMeans implementation but it is not yet ported to the machine learning library. I assume that only the used data types have to be adapted and then it can be more or less directly moved to flink-ml. The kMeans++ [1] and the kMeans|| [2] algorithm constitute a better implementation because the improve the initial seeding phase to achieve near optimal clustering. It might be worthwhile to implement kMeans||. Resources: [1] http://ilpubs.stanford.edu:8090/778/1/2006-13.pdf [2] http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33463484 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/clustering/Clustering.scala --- @@ -0,0 +1,256 @@ +/* --- End diff -- Rename to ClusteringData.scala --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2138) PartitionCustom for streaming
[ https://issues.apache.org/jira/browse/FLINK-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605634#comment-14605634 ] ASF GitHub Bot commented on FLINK-2138: --- Github user gaborhermann commented on the pull request: https://github.com/apache/flink/pull/872#issuecomment-116671285 I'd prefer the function implementation (like `(K, Int) = Int`), but it should stay consistent with the batch API. I don't see why the wrapping would effect the compatibility checking of the partitioning. Is it okay, if I change it to the function implementation in both (Scala batch, Scala streaming) APIs? If not, then let's just stick with the partitioner implementation in the APIs. PartitionCustom for streaming - Key: FLINK-2138 URL: https://issues.apache.org/jira/browse/FLINK-2138 Project: Flink Issue Type: New Feature Components: Streaming Affects Versions: 0.9 Reporter: Márton Balassi Assignee: Gábor Hermann Priority: Minor The batch API has support for custom partitioning, this should be added for streaming with a similar signature. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2138) PartitionCustom for streaming
[ https://issues.apache.org/jira/browse/FLINK-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605565#comment-14605565 ] ASF GitHub Bot commented on FLINK-2138: --- Github user gaborhermann commented on the pull request: https://github.com/apache/flink/pull/872#issuecomment-116647912 By the way, in the Scala DataSet the user should specify the Java `Partitioner[K]` class. Wouldn't it be more convenient to wrap a function like `(K, Int) = Int` into a `Partitioner[K]` similarly to the KeySelector? PartitionCustom for streaming - Key: FLINK-2138 URL: https://issues.apache.org/jira/browse/FLINK-2138 Project: Flink Issue Type: New Feature Components: Streaming Affects Versions: 0.9 Reporter: Márton Balassi Assignee: Gábor Hermann Priority: Minor The batch API has support for custom partitioning, this should be added for streaming with a similar signature. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2138) PartitionCustom for streaming
[ https://issues.apache.org/jira/browse/FLINK-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605570#comment-14605570 ] ASF GitHub Bot commented on FLINK-2138: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/872#issuecomment-116649358 In the batch API, equality of the partitioners is used to determine compatibility of the partitioning. This may at some point become interesting for the streaming API as well. In any case, let's pick one of the two variants (function of partitioner implementation). Overloading the methods too much with equally powerful variants inevitable confuses users. PartitionCustom for streaming - Key: FLINK-2138 URL: https://issues.apache.org/jira/browse/FLINK-2138 Project: Flink Issue Type: New Feature Components: Streaming Affects Versions: 0.9 Reporter: Márton Balassi Assignee: Gábor Hermann Priority: Minor The batch API has support for custom partitioning, this should be added for streaming with a similar signature. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/872#issuecomment-116649358 In the batch API, equality of the partitioners is used to determine compatibility of the partitioning. This may at some point become interesting for the streaming API as well. In any case, let's pick one of the two variants (function of partitioner implementation). Overloading the methods too much with equally powerful variants inevitable confuses users. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605613#comment-14605613 ] ASF GitHub Bot commented on FLINK-1731: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33463634 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/clustering/Clustering.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import breeze.linalg.{DenseVector = BreezeDenseVector, Vector = BreezeVector} +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.math.{DenseVector, Vector} + +/** + * Trainings- and test-data set for the K-Means implementation + * [[org.apache.flink.ml.clustering.KMeans]]. + */ +object Clustering { + + /* + * Number of iterations for the K-Means algorithm. + */ + val iterations = 10 + + /* + * Sequence of initial centroids. + */ + val centroidData: Seq[LabeledVector] = Seq( +LabeledVector(1, DenseVector(-0.1369104662767052, 0.2949172396037093, -0.01070450818187003)), +LabeledVector(2, DenseVector(0.43643950041582885, 0.30117329671833215, 0.20965108353159922)), +LabeledVector(3, DenseVector(0.26011627041438423, 0.22954649683337805, 0.2936286262276151)), +LabeledVector(4, DenseVector(-0.041980932305508145, 0.03116256923634109, 0.31065743174542293)), +LabeledVector(5, DenseVector(0.0984398491976613, -0.21227718242541602, -0.45083084300074255)), +LabeledVector(6, DenseVector(-0.216526923545, -0.47142840804338293, -0.02298954070830948)), +LabeledVector(7, DenseVector(-0.0632307695567563, 0.2387221400443612, 0.09416850805771804)), +LabeledVector(8, DenseVector(0.16383680898916775, -0.24586810465119346, 0.08783590589294081)), +LabeledVector(9, DenseVector(-0.24763544645492513, 0.19688995732231254, 0.4520904742796472)), +LabeledVector(10, DenseVector(0.16468044138881932, 0.06259522206982082, 0.12145870313604247)) + + ) + + /* + * 3 Dimensional DenseVectors from a Part of Cosmo-Gas Dataset + * Reference: http://nuage.cs.washington.edu/benchmark/ + */ + val trainingData: Seq[Vector] = Seq( +DenseVector(-0.489811986685, 0.496883004904, -0.483860999346), +DenseVector(-0.485296010971, 0.496421992779, -0.484212994576), +DenseVector(-0.481514006853, 0.496134012938, -0.48508900404), +DenseVector(-0.47854255, 0.496246010065, -0.486301004887), +DenseVector(-0.475461006165, 0.496093004942, -0.487686008215), +DenseVector(-0.471846997738, 0.496558994055, -0.488242000341), +DenseVector(-0.467496991158, 0.497166007757, -0.48861899972), +DenseVector(-0.463036000729, 0.497680991888, -0.489721000195), +DenseVector(-0.458972990513, 0.4984369874, -0.490575999022), +DenseVector(-0.455772012472, 0.499684005976, -0.491737008095), +DenseVector(-0.453074991703, -0.499433010817, -0.492006987333), +DenseVector(-0.450913995504, -0.499316990376, -0.492769002914), +DenseVector(-0.448724985123, -0.499406009912, -0.493508011103), +DenseVector(-0.44715899229, -0.499680995941, -0.494500011206), +DenseVector(-0.445362001657, -0.499630987644, -0.495151996613), +DenseVector(-0.442811012268, -0.499303996563, -0.495151013136), +DenseVector(-0.439810991287, -0.499332994223, -0.49529799819), +DenseVector(-0.43678098917, -0.499361991882, -0.49545699358), +DenseVector(-0.433919012547, -0.499334007502, -0.495705991983), +DenseVector(-0.43117800355, -0.499345004559, -0.496196985245), +DenseVector(-0.428333997726, -0.499083012342, -0.496385991573), +DenseVector(-0.425300985575, -0.49844199419, -0.496405988932), +DenseVector(-0.421882003546, -0.497743010521, -0.496706992388), +
[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33463634 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/clustering/Clustering.scala --- @@ -0,0 +1,256 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import breeze.linalg.{DenseVector = BreezeDenseVector, Vector = BreezeVector} +import org.apache.flink.ml.common.LabeledVector +import org.apache.flink.ml.math.{DenseVector, Vector} + +/** + * Trainings- and test-data set for the K-Means implementation + * [[org.apache.flink.ml.clustering.KMeans]]. + */ +object Clustering { + + /* + * Number of iterations for the K-Means algorithm. + */ + val iterations = 10 + + /* + * Sequence of initial centroids. + */ + val centroidData: Seq[LabeledVector] = Seq( +LabeledVector(1, DenseVector(-0.1369104662767052, 0.2949172396037093, -0.01070450818187003)), +LabeledVector(2, DenseVector(0.43643950041582885, 0.30117329671833215, 0.20965108353159922)), +LabeledVector(3, DenseVector(0.26011627041438423, 0.22954649683337805, 0.2936286262276151)), +LabeledVector(4, DenseVector(-0.041980932305508145, 0.03116256923634109, 0.31065743174542293)), +LabeledVector(5, DenseVector(0.0984398491976613, -0.21227718242541602, -0.45083084300074255)), +LabeledVector(6, DenseVector(-0.216526923545, -0.47142840804338293, -0.02298954070830948)), +LabeledVector(7, DenseVector(-0.0632307695567563, 0.2387221400443612, 0.09416850805771804)), +LabeledVector(8, DenseVector(0.16383680898916775, -0.24586810465119346, 0.08783590589294081)), +LabeledVector(9, DenseVector(-0.24763544645492513, 0.19688995732231254, 0.4520904742796472)), +LabeledVector(10, DenseVector(0.16468044138881932, 0.06259522206982082, 0.12145870313604247)) + + ) + + /* + * 3 Dimensional DenseVectors from a Part of Cosmo-Gas Dataset + * Reference: http://nuage.cs.washington.edu/benchmark/ + */ + val trainingData: Seq[Vector] = Seq( +DenseVector(-0.489811986685, 0.496883004904, -0.483860999346), +DenseVector(-0.485296010971, 0.496421992779, -0.484212994576), +DenseVector(-0.481514006853, 0.496134012938, -0.48508900404), +DenseVector(-0.47854255, 0.496246010065, -0.486301004887), +DenseVector(-0.475461006165, 0.496093004942, -0.487686008215), +DenseVector(-0.471846997738, 0.496558994055, -0.488242000341), +DenseVector(-0.467496991158, 0.497166007757, -0.48861899972), +DenseVector(-0.463036000729, 0.497680991888, -0.489721000195), +DenseVector(-0.458972990513, 0.4984369874, -0.490575999022), +DenseVector(-0.455772012472, 0.499684005976, -0.491737008095), +DenseVector(-0.453074991703, -0.499433010817, -0.492006987333), +DenseVector(-0.450913995504, -0.499316990376, -0.492769002914), +DenseVector(-0.448724985123, -0.499406009912, -0.493508011103), +DenseVector(-0.44715899229, -0.499680995941, -0.494500011206), +DenseVector(-0.445362001657, -0.499630987644, -0.495151996613), +DenseVector(-0.442811012268, -0.499303996563, -0.495151013136), +DenseVector(-0.439810991287, -0.499332994223, -0.49529799819), +DenseVector(-0.43678098917, -0.499361991882, -0.49545699358), +DenseVector(-0.433919012547, -0.499334007502, -0.495705991983), +DenseVector(-0.43117800355, -0.499345004559, -0.496196985245), +DenseVector(-0.428333997726, -0.499083012342, -0.496385991573), +DenseVector(-0.425300985575, -0.49844199419, -0.496405988932), +DenseVector(-0.421882003546, -0.497743010521, -0.496706992388), +DenseVector(-0.418137013912, -0.497193992138, -0.496524989605), +DenseVector(-0.414458990097, -0.496717989445, -0.49600699544), +DenseVector(-0.411509007215, -0.495965003967, -0.495519012213), +DenseVector(-0.40851598978, -0.49593898654,
[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...
Github user thvasilo commented on the pull request: https://github.com/apache/flink/pull/700#issuecomment-116669400 Another note: It should not be necessary for the user to provide the initial centroids, those should be possible to generated from the algorithm itself, ideally with a scheme like kmeans++. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33460336 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k centroids of the k clusters. They are used as start off point of the + * algorithm for clustering the data set. The centroids are recalculated as often as set in + *
[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33461173 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k centroids of the k clusters. They are used as start off point of the + * algorithm for clustering the data set. The centroids are recalculated as often as set in + *
[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33463192 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k centroids of the k clusters. They are used as start off point of the + * algorithm for clustering the data set. The centroids are recalculated as often as set in + *
[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605607#comment-14605607 ] ASF GitHub Bot commented on FLINK-1731: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33463192 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),…,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 \forall j, 1 ≤ j ≤ k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + *
[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605627#comment-14605627 ] ASF GitHub Bot commented on FLINK-1731: --- Github user peedeeX21 commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33465223 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/clustering/Clustering.scala --- @@ -0,0 +1,256 @@ +/* --- End diff -- done Add kMeans clustering algorithm to machine learning library --- Key: FLINK-1731 URL: https://issues.apache.org/jira/browse/FLINK-1731 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Peter Schrott Labels: ML The Flink repository already contains a kMeans implementation but it is not yet ported to the machine learning library. I assume that only the used data types have to be adapted and then it can be more or less directly moved to flink-ml. The kMeans++ [1] and the kMeans|| [2] algorithm constitute a better implementation because the improve the initial seeding phase to achieve near optimal clustering. It might be worthwhile to implement kMeans||. Resources: [1] http://ilpubs.stanford.edu:8090/778/1/2006-13.pdf [2] http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...
Github user peedeeX21 commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33465223 --- Diff: flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/clustering/Clustering.scala --- @@ -0,0 +1,256 @@ +/* --- End diff -- done --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...
Github user gaborhermann commented on the pull request: https://github.com/apache/flink/pull/872#issuecomment-116671285 I'd prefer the function implementation (like `(K, Int) = Int`), but it should stay consistent with the batch API. I don't see why the wrapping would effect the compatibility checking of the partitioning. Is it okay, if I change it to the function implementation in both (Scala batch, Scala streaming) APIs? If not, then let's just stick with the partitioner implementation in the APIs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605573#comment-14605573 ] ASF GitHub Bot commented on FLINK-1731: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33460529 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),…,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 \forall j, 1 ≤ j ≤ k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + *
[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33460529 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k centroids of the k clusters. They are used as start off point of the + * algorithm for clustering the data set. The centroids are recalculated as often as set in + *
[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605585#comment-14605585 ] ASF GitHub Bot commented on FLINK-1731: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33461173 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),…,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 \forall j, 1 ≤ j ≤ k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + *
[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605598#comment-14605598 ] ASF GitHub Bot commented on FLINK-1731: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33462286 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),…,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 \forall j, 1 ≤ j ≤ k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + *
[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...
Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33462286 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k centroids of the k clusters. They are used as start off point of the + * algorithm for clustering the data set. The centroids are recalculated as often as set in + *
[jira] [Commented] (FLINK-2008) PersistentKafkaSource is sometimes emitting tuples multiple times
[ https://issues.apache.org/jira/browse/FLINK-2008?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605544#comment-14605544 ] Robert Metzger commented on FLINK-2008: --- I'm looking into these issues now ... PersistentKafkaSource is sometimes emitting tuples multiple times - Key: FLINK-2008 URL: https://issues.apache.org/jira/browse/FLINK-2008 Project: Flink Issue Type: Bug Components: Kafka Connector, Streaming Affects Versions: 0.9 Reporter: Robert Metzger Assignee: Robert Metzger The PersistentKafkaSource is expected to emit records exactly once. Two test cases of the KafkaITCase are sporadically failing because records are emitted multiple times. Affected tests: {{testPersistentSourceWithOffsetUpdates()}}, after the offsets have been changed manually in ZK: {code} java.lang.RuntimeException: Expected v to be 3, but was 4 on element 0 array=[4, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 2] {code} {{brokerFailureTest()}} also fails: {code} 05/13/2015 08:13:16 Custom source - Stream Sink(1/1) switched to FAILED java.lang.AssertionError: Received tuple with value 21 twice at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertFalse(Assert.java:64) at org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:877) at org.apache.flink.streaming.connectors.kafka.KafkaITCase$15.invoke(KafkaITCase.java:859) at org.apache.flink.streaming.api.operators.StreamSink.callUserFunction(StreamSink.java:39) at org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) at org.apache.flink.streaming.api.operators.ChainableStreamOperator.collect(ChainableStreamOperator.java:54) at org.apache.flink.streaming.api.collector.CollectorWrapper.collect(CollectorWrapper.java:39) at org.apache.flink.streaming.connectors.kafka.api.persistent.PersistentKafkaSource.run(PersistentKafkaSource.java:173) at org.apache.flink.streaming.api.operators.StreamSource.callUserFunction(StreamSource.java:40) at org.apache.flink.streaming.api.operators.StreamOperator.callUserFunctionAndLogException(StreamOperator.java:137) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:34) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:139) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:745) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605595#comment-14605595 ] ASF GitHub Bot commented on FLINK-1731: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33462036 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),…,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 \forall j, 1 ≤ j ≤ k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + *
[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605615#comment-14605615 ] ASF GitHub Bot commented on FLINK-1731: --- Github user thvasilo commented on the pull request: https://github.com/apache/flink/pull/700#issuecomment-116662421 Hello I've left some initial comments. Once those have been addressed I'll try to do some more integration testing and then pass the review over to a commiter. Add kMeans clustering algorithm to machine learning library --- Key: FLINK-1731 URL: https://issues.apache.org/jira/browse/FLINK-1731 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Peter Schrott Labels: ML The Flink repository already contains a kMeans implementation but it is not yet ported to the machine learning library. I assume that only the used data types have to be adapted and then it can be more or less directly moved to flink-ml. The kMeans++ [1] and the kMeans|| [2] algorithm constitute a better implementation because the improve the initial seeding phase to achieve near optimal clustering. It might be worthwhile to implement kMeans||. Resources: [1] http://ilpubs.stanford.edu:8090/778/1/2006-13.pdf [2] http://theory.stanford.edu/~sergei/papers/vldb12-kmpar.pdf -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...
Github user thvasilo commented on the pull request: https://github.com/apache/flink/pull/700#issuecomment-116662421 Hello I've left some initial comments. Once those have been addressed I'll try to do some more integration testing and then pass the review over to a commiter. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605638#comment-14605638 ] ASF GitHub Bot commented on FLINK-1731: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33466208 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),…,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 \forall j, 1 ≤ j ≤ k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + *
[jira] [Commented] (FLINK-2200) Flink API with Scala 2.11 - Maven Repository
[ https://issues.apache.org/jira/browse/FLINK-2200?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605502#comment-14605502 ] Chiwan Park commented on FLINK-2200: Hi, I'm working on this issue. I found there is a problem because lots of modules are affected by scala variation. Since flink-runtime and flink-clients are affected by scala version variation, Almost all modules of flink have to be affected by this changing. I know we discussed and concluded that only name of affected modules should be changed. But I think we have to rethink about this. Flink API with Scala 2.11 - Maven Repository Key: FLINK-2200 URL: https://issues.apache.org/jira/browse/FLINK-2200 Project: Flink Issue Type: Wish Components: Build System, Scala API Reporter: Philipp Götze Assignee: Chiwan Park Priority: Trivial Labels: maven It would be nice if you could upload a pre-built version of the Flink API with Scala 2.11 to the maven repository. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2289) Make JobManager highly available
[ https://issues.apache.org/jira/browse/FLINK-2289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605534#comment-14605534 ] Till Rohrmann commented on FLINK-2289: -- Sorry, I didn't see your issue. Will do. Make JobManager highly available Key: FLINK-2289 URL: https://issues.apache.org/jira/browse/FLINK-2289 Project: Flink Issue Type: Improvement Reporter: Till Rohrmann Assignee: Till Rohrmann Currently, the {{JobManager}} is the single point of failure in the Flink system. If it fails, then your job cannot be recovered and the Flink cluster is no longer able to receive new jobs. Therefore, it is crucial to make the {{JobManager}} fault tolerant so that the Flink cluster can recover from failed {{JobManager}}. As a first step towards this goal, I propose to make the {{JobManager}} highly available by starting multiple instances and using Apache ZooKeeper to elect a leader. The leader is responsible for the execution of the Flink job. In case that the {{JobManager}} dies, one of the other running {{JobManager}} will be elected as the leader and take over the role of the leader. The {{Client}} and the {{TaskManager}} will automatically detect the new {{JobManager}} by querying the ZooKeeper cluster. Note that this does not achieve full fault tolerance for the {{JobManager}} but it allows the cluster to recover from failed {{JobManager}}. The design of high-availability for the {{JobManager}} is tracked in the wiki here [1]. Resources: [1] [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2284) Confusing/inconsistent PartitioningStrategy
[ https://issues.apache.org/jira/browse/FLINK-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605353#comment-14605353 ] Matthias J. Sax commented on FLINK-2284: It is documented in {{org.apache.flink.streaming.api.datastream.DataStream}}. And this is the API users see. Confusing/inconsistent PartitioningStrategy --- Key: FLINK-2284 URL: https://issues.apache.org/jira/browse/FLINK-2284 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9 Reporter: Stephan Ewen The PartitioningStrategy in {{org.apache.flink.streaming.runtime.partitioner.StreamPartitioner.java}} is non standard and not easily understandable. What form of partitioning is `SHUFFLE`? Shuffle just means redistribute, it says nothing about what it does. Same with `DISTRIBUTE`. Also `GLOBAL` is not a well-defined/established term. Why is `GROUPBY` a partition type? Doesn't grouping simply hash partition (like I assume SHUFFLE means), so why does it have an extra entry? Sticking with principled and established names/concepts is important to allow people to collaborate on the code. Why not stick with the partitioning types defined in the batch API? They are well defined and named: ``` NONE, FORWARD, RANDOM, HASH, RANGE, FORCED_REBALANCE, BROADCAST, CUSTOM ``` -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2290) CoRecordReader Does Not Read Events From Both Inputs When No Elements Arrive
[ https://issues.apache.org/jira/browse/FLINK-2290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-2290: --- Assignee: Aljoscha Krettek CoRecordReader Does Not Read Events From Both Inputs When No Elements Arrive Key: FLINK-2290 URL: https://issues.apache.org/jira/browse/FLINK-2290 Project: Flink Issue Type: Bug Components: Streaming Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek When no elements arrive the reader will always try to read from the same input index. This means that it will only process elements form this input. This could be problematic with Watermarks/Checkpoint barriers. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2291) Use ZooKeeper to elect JobManager leader and send information to TaskManagers
Till Rohrmann created FLINK-2291: Summary: Use ZooKeeper to elect JobManager leader and send information to TaskManagers Key: FLINK-2291 URL: https://issues.apache.org/jira/browse/FLINK-2291 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Affects Versions: 0.10 Reporter: Till Rohrmann Assignee: Till Rohrmann Use ZooKeeper to determine the leader of a set of {{JobManager}}s which will act as the responsible {{JobManager}} for all {{TaskManager}}. The {{TaskManager}} will get the address of the leader from ZooKeeper. Related Wiki: [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2283) Make grouped reduce/fold/aggregations stateful using Partitioned state
[ https://issues.apache.org/jira/browse/FLINK-2283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605357#comment-14605357 ] Gyula Fora commented on FLINK-2283: --- I would use stateful java maps using PartitionedState for now. I see several reasons to do this instead of more complex out-of-core implementations: -State will be properly checkpointed with no additional implementation -We can use the state backend (if necessary) to handle out-of-core state (this is probably a perfect candidate for lazy state fetching) -This implementation will scale easily if we implement it for the partitioned state -It's a trivial implementation, while managed memory will probably be a lot of overhead Make grouped reduce/fold/aggregations stateful using Partitioned state -- Key: FLINK-2283 URL: https://issues.apache.org/jira/browse/FLINK-2283 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Gyula Fora Priority: Minor Currently the inner state of the grouped aggregations are not persisted as an operator state. These operators should be reimplemented to use the newly introduced partitioned state abstractions which will make them fault tolerant and scalable for the future. A suggested implementation would be to use a stateful mapper to implement the desired behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2289) Make JobManager highly available
[ https://issues.apache.org/jira/browse/FLINK-2289?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-2289. Resolution: Duplicate See FLINK-2287 Make JobManager highly available Key: FLINK-2289 URL: https://issues.apache.org/jira/browse/FLINK-2289 Project: Flink Issue Type: Improvement Reporter: Till Rohrmann Assignee: Till Rohrmann Currently, the {{JobManager}} is the single point of failure in the Flink system. If it fails, then your job cannot be recovered and the Flink cluster is no longer able to receive new jobs. Therefore, it is crucial to make the {{JobManager}} fault tolerant so that the Flink cluster can recover from failed {{JobManager}}. As a first step towards this goal, I propose to make the {{JobManager}} highly available by starting multiple instances and using Apache ZooKeeper to elect a leader. The leader is responsible for the execution of the Flink job. In case that the {{JobManager}} dies, one of the other running {{JobManager}} will be elected as the leader and take over the role of the leader. The {{Client}} and the {{TaskManager}} will automatically detect the new {{JobManager}} by querying the ZooKeeper cluster. Note that this does not achieve full fault tolerance for the {{JobManager}} but it allows the cluster to recover from failed {{JobManager}}. The design of high-availability for the {{JobManager}} is tracked in the wiki here [1]. Resources: [1] [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2285) Active policy emits elements of the last window twice
[ https://issues.apache.org/jira/browse/FLINK-2285?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605340#comment-14605340 ] ASF GitHub Bot commented on FLINK-2285: --- GitHub user mbalassi opened a pull request: https://github.com/apache/flink/pull/873 [FLINK-2285] [streaming] Removed duplicate call in close from GroupedActiveDiscretizer You can merge this pull request into a Git repository by running: $ git pull https://github.com/mbalassi/flink flink-2285 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/873.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #873 commit 1065a2893afd2c342ccc8949928273862426162c Author: mbalassi mbala...@apache.org Date: 2015-06-29T09:17:21Z [FLINK-2285] [streaming] Removed duplicate call in close from GroupedActiveDiscretizer commit e0bd71a2589ccd7e90c6ca657587749fcd56d766 Author: mbalassi mbala...@apache.org Date: 2015-06-29T09:21:23Z [streaming] Minor streaming code cleanups Active policy emits elements of the last window twice - Key: FLINK-2285 URL: https://issues.apache.org/jira/browse/FLINK-2285 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9, 0.10 Reporter: Márton Balassi Assignee: Márton Balassi The root cause is some duplicate code between the close methods of the GroupedActiveDiscretizer and the GroupedStreamDiscretizer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2285] [streaming] Removed duplicate cal...
GitHub user mbalassi opened a pull request: https://github.com/apache/flink/pull/873 [FLINK-2285] [streaming] Removed duplicate call in close from GroupedActiveDiscretizer You can merge this pull request into a Git repository by running: $ git pull https://github.com/mbalassi/flink flink-2285 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/873.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #873 commit 1065a2893afd2c342ccc8949928273862426162c Author: mbalassi mbala...@apache.org Date: 2015-06-29T09:17:21Z [FLINK-2285] [streaming] Removed duplicate call in close from GroupedActiveDiscretizer commit e0bd71a2589ccd7e90c6ca657587749fcd56d766 Author: mbalassi mbala...@apache.org Date: 2015-06-29T09:21:23Z [streaming] Minor streaming code cleanups --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-2291) Use ZooKeeper to elect JobManager leader and send information to TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-2291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-2291: - Description: Use ZooKeeper to determine the leader of a set of {{JobManager}} s which will act as the responsible {{JobManager}} for all {{TaskManager}}. The {{TaskManager}} will get the address of the leader from ZooKeeper. Related Wiki: [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability] was: Use ZooKeeper to determine the leader of a set of {{JobManager}}s which will act as the responsible {{JobManager}} for all {{TaskManager}}. The {{TaskManager}} will get the address of the leader from ZooKeeper. Related Wiki: [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability] Use ZooKeeper to elect JobManager leader and send information to TaskManagers - Key: FLINK-2291 URL: https://issues.apache.org/jira/browse/FLINK-2291 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Affects Versions: 0.10 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 0.10 Use ZooKeeper to determine the leader of a set of {{JobManager}} s which will act as the responsible {{JobManager}} for all {{TaskManager}}. The {{TaskManager}} will get the address of the leader from ZooKeeper. Related Wiki: [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2291) Use ZooKeeper to elect JobManager leader and send information to TaskManagers
[ https://issues.apache.org/jira/browse/FLINK-2291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-2291: - Description: Use ZooKeeper to determine the leader of a set of {{JobManagers}} which will act as the responsible {{JobManager}} for all {{TaskManager}}. The {{TaskManager}} will get the address of the leader from ZooKeeper. Related Wiki: [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability] was: Use ZooKeeper to determine the leader of a set of {{JobManager}} s which will act as the responsible {{JobManager}} for all {{TaskManager}}. The {{TaskManager}} will get the address of the leader from ZooKeeper. Related Wiki: [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability] Use ZooKeeper to elect JobManager leader and send information to TaskManagers - Key: FLINK-2291 URL: https://issues.apache.org/jira/browse/FLINK-2291 Project: Flink Issue Type: Sub-task Components: JobManager, TaskManager Affects Versions: 0.10 Reporter: Till Rohrmann Assignee: Till Rohrmann Fix For: 0.10 Use ZooKeeper to determine the leader of a set of {{JobManagers}} which will act as the responsible {{JobManager}} for all {{TaskManager}}. The {{TaskManager}} will get the address of the leader from ZooKeeper. Related Wiki: [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...
Github user peedeeX21 commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33469076 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k centroids of the k clusters. They are used as start off point of the + * algorithm for clustering the data set. The centroids are recalculated as often as set in + *
[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...
Github user peedeeX21 commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33476507 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k centroids of the k clusters. They are used as start off point of the + * algorithm for clustering the data set. The centroids are recalculated as often as set in + *
[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...
Github user gaborhermann commented on the pull request: https://github.com/apache/flink/pull/872#issuecomment-116736041 Sorry for not making myself clear. I would actually go for 4. Only the Scala function (both in the streaming and batch API) I don't understand how changing from partitioner implementation to function implementation in the batch API would mess up determining the compatibility of the partitioning. By compatibility I mean the type of the key must be the same as the input of the partitioner. I suppose there was another reason (that I do not understand) for choosing the partitioner implementation for the Scala batch API, so if (4) is not an option, I would go for (2) (only partitioner, sync with batch API). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605726#comment-14605726 ] ASF GitHub Bot commented on FLINK-1731: --- Github user peedeeX21 commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33471983 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),…,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 \forall j, 1 ≤ j ≤ k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + *
[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605752#comment-14605752 ] ASF GitHub Bot commented on FLINK-1731: --- Github user peedeeX21 commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33475298 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),…,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 \forall j, 1 ≤ j ≤ k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + *
[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...
Github user peedeeX21 commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33475298 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k centroids of the k clusters. They are used as start off point of the + * algorithm for clustering the data set. The centroids are recalculated as often as set in + *
[jira] [Commented] (FLINK-2138) PartitionCustom for streaming
[ https://issues.apache.org/jira/browse/FLINK-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605774#comment-14605774 ] ASF GitHub Bot commented on FLINK-2138: --- Github user gaborhermann commented on the pull request: https://github.com/apache/flink/pull/872#issuecomment-116736041 Sorry for not making myself clear. I would actually go for 4. Only the Scala function (both in the streaming and batch API) I don't understand how changing from partitioner implementation to function implementation in the batch API would mess up determining the compatibility of the partitioning. By compatibility I mean the type of the key must be the same as the input of the partitioner. I suppose there was another reason (that I do not understand) for choosing the partitioner implementation for the Scala batch API, so if (4) is not an option, I would go for (2) (only partitioner, sync with batch API). PartitionCustom for streaming - Key: FLINK-2138 URL: https://issues.apache.org/jira/browse/FLINK-2138 Project: Flink Issue Type: New Feature Components: Streaming Affects Versions: 0.9 Reporter: Márton Balassi Assignee: Gábor Hermann Priority: Minor The batch API has support for custom partitioning, this should be added for streaming with a similar signature. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2138) PartitionCustom for streaming
[ https://issues.apache.org/jira/browse/FLINK-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605711#comment-14605711 ] ASF GitHub Bot commented on FLINK-2138: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/872#issuecomment-116710090 I am confused now, what is it going to be? 1. Overloading, such that it is Scala function and Partitioner, at the cost of redundant APIs. 2. Only partitioner (sync with batch API) 3. Only Scala function (break with batch API) I am not a big fan of (1), as these redundant options are confusing blow-ups of the APIs. PartitionCustom for streaming - Key: FLINK-2138 URL: https://issues.apache.org/jira/browse/FLINK-2138 Project: Flink Issue Type: New Feature Components: Streaming Affects Versions: 0.9 Reporter: Márton Balassi Assignee: Gábor Hermann Priority: Minor The batch API has support for custom partitioning, this should be added for streaming with a similar signature. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/872#issuecomment-116710090 I am confused now, what is it going to be? 1. Overloading, such that it is Scala function and Partitioner, at the cost of redundant APIs. 2. Only partitioner (sync with batch API) 3. Only Scala function (break with batch API) I am not a big fan of (1), as these redundant options are confusing blow-ups of the APIs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...
Github user peedeeX21 commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33471983 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k centroids of the k clusters. They are used as start off point of the + * algorithm for clustering the data set. The centroids are recalculated as often as set in + *
[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...
Github user peedeeX21 commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33475321 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),â¦,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ⤠|| x_p - m_j^(t) ||^2 \forall j, 1 ⤠j ⤠k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + * Defines the initial k centroids of the k clusters. They are used as start off point of the + * algorithm for clustering the data set. The centroids are recalculated as often as set in + *
[jira] [Commented] (FLINK-1731) Add kMeans clustering algorithm to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605753#comment-14605753 ] ASF GitHub Bot commented on FLINK-1731: --- Github user peedeeX21 commented on a diff in the pull request: https://github.com/apache/flink/pull/700#discussion_r33475321 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/clustering/KMeans.scala --- @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.clustering + +import org.apache.flink.api.common.functions.RichMapFunction +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields +import org.apache.flink.api.scala.{DataSet, _} +import org.apache.flink.configuration.Configuration +import org.apache.flink.ml.common.{LabeledVector, _} +import org.apache.flink.ml.math.Breeze._ +import org.apache.flink.ml.math.Vector +import org.apache.flink.ml.metrics.distances.EuclideanDistanceMetric +import org.apache.flink.ml.pipeline._ + +import scala.collection.JavaConverters._ + + +/** + * Implements the KMeans algorithm which calculates cluster centroids based on set of training data + * points and a set of k initial centroids. + * + * [[KMeans]] is a [[Predictor]] which needs to be trained on a set of data points and can then be + * used to assign new points to the learned cluster centroids. + * + * The KMeans algorithm works as described on Wikipedia + * (http://en.wikipedia.org/wiki/K-means_clustering): + * + * Given an initial set of k means m1(1),…,mk(1) (see below), the algorithm proceeds by alternating + * between two steps: + * + * ===Assignment step:=== + * + * Assign each observation to the cluster whose mean yields the least within-cluster sum of + * squares (WCSS). Since the sum of squares is the squared Euclidean distance, this is intuitively + * the nearest mean. (Mathematically, this means partitioning the observations according to the + * Voronoi diagram generated by the means). + * + * `S_i^(t) = { x_p : || x_p - m_i^(t) ||^2 ≤ || x_p - m_j^(t) ||^2 \forall j, 1 ≤ j ≤ k}`, + * where each `x_p` is assigned to exactly one `S^{(t)}`, even if it could be assigned to two or + * more of them. + * + * ===Update step:=== + * + * Calculate the new means to be the centroids of the observations in the new clusters. + * + * `m^{(t+1)}_i = ( 1 / |S^{(t)}_i| ) \sum_{x_j \in S^{(t)}_i} x_j` + * + * Since the arithmetic mean is a least-squares estimator, this also minimizes the within-cluster + * sum of squares (WCSS) objective. + * + * @example + * {{{ + * val trainingDS: DataSet[Vector] = env.fromCollection(Clustering.trainingData) + * val initialCentroids: DataSet[LabledVector] = env.fromCollection(Clustering.initCentroids) + * + * val kmeans = KMeans() + *.setInitialCentroids(initialCentroids) + *.setNumIterations(10) + * + * kmeans.fit(trainingDS) + * + * // getting the computed centroids + * val centroidsResult = kmeans.centroids.get.collect() + * + * // get matching clusters for new points + * val testDS: DataSet[Vector] = env.fromCollection(Clustering.testData) + * val clusters: DataSet[LabeledVector] = kmeans.predict(testDS) + * }}} + * + * =Parameters= + * + * - [[org.apache.flink.ml.clustering.KMeans.NumIterations]]: + * Defines the number of iterations to recalculate the centroids of the clusters. As it + * is a heuristic algorithm, there is no guarantee that it will converge to the global optimum. The + * centroids of the clusters and the reassignment of the data points will be repeated till the + * given number of iterations is reached. + * (Default value: '''10''') + * + * - [[org.apache.flink.ml.clustering.KMeans.InitialCentroids]]: + *
[jira] [Commented] (FLINK-2138) PartitionCustom for streaming
[ https://issues.apache.org/jira/browse/FLINK-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605818#comment-14605818 ] ASF GitHub Bot commented on FLINK-2138: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/872#issuecomment-116745707 The partitioner function in Scala was simply added as a mirror of the Java API. The batch API is stable, that means at most we can add a Scala function and deprecate the partitioner. PartitionCustom for streaming - Key: FLINK-2138 URL: https://issues.apache.org/jira/browse/FLINK-2138 Project: Flink Issue Type: New Feature Components: Streaming Affects Versions: 0.9 Reporter: Márton Balassi Assignee: Gábor Hermann Priority: Minor The batch API has support for custom partitioning, this should be added for streaming with a similar signature. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/872#issuecomment-116745707 The partitioner function in Scala was simply added as a mirror of the Java API. The batch API is stable, that means at most we can add a Scala function and deprecate the partitioner. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2138) PartitionCustom for streaming
[ https://issues.apache.org/jira/browse/FLINK-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605880#comment-14605880 ] ASF GitHub Bot commented on FLINK-2138: --- Github user gaborhermann commented on the pull request: https://github.com/apache/flink/pull/872#issuecomment-116755999 Okay, then I will * deprecate the partitioner implementation in the batch API * add the function implementation to the batch API * add the function implementation to the streaming API and remove the partitioner implementation (so streaming will only have function implementation). As this PR is not merged yet we do not break the streaming API. Is it okay? I guess it's worth it. This way Scala users will be able to write more concise code and they will not get confused by the overloaded functions because the ones with the partitioner will be deprecated. PartitionCustom for streaming - Key: FLINK-2138 URL: https://issues.apache.org/jira/browse/FLINK-2138 Project: Flink Issue Type: New Feature Components: Streaming Affects Versions: 0.9 Reporter: Márton Balassi Assignee: Gábor Hermann Priority: Minor The batch API has support for custom partitioning, this should be added for streaming with a similar signature. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2138] Added custom partitioning to Data...
Github user gaborhermann commented on the pull request: https://github.com/apache/flink/pull/872#issuecomment-116755999 Okay, then I will * deprecate the partitioner implementation in the batch API * add the function implementation to the batch API * add the function implementation to the streaming API and remove the partitioner implementation (so streaming will only have function implementation). As this PR is not merged yet we do not break the streaming API. Is it okay? I guess it's worth it. This way Scala users will be able to write more concise code and they will not get confused by the overloaded functions because the ones with the partitioner will be deprecated. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2282) Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1
[ https://issues.apache.org/jira/browse/FLINK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605335#comment-14605335 ] Gyula Fora commented on FLINK-2282: --- Okay Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1 --- Key: FLINK-2282 URL: https://issues.apache.org/jira/browse/FLINK-2282 Project: Flink Issue Type: Task Components: Streaming Affects Versions: 0.9.1 Reporter: Gyula Fora Non-grouped reduces/fold/aggregations have been removed for DataStream in 0.10 snapshot, so these features should be deprecated for the current release, and some note should be added to the javadocs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2282) Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1
[ https://issues.apache.org/jira/browse/FLINK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-2282. - Resolution: Not A Problem Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1 --- Key: FLINK-2282 URL: https://issues.apache.org/jira/browse/FLINK-2282 Project: Flink Issue Type: Task Components: Streaming Affects Versions: 0.9.1 Reporter: Gyula Fora Non-grouped reduces/fold/aggregations have been removed for DataStream in 0.10 snapshot, so these features should be deprecated for the current release, and some note should be added to the javadocs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1731] [ml] Implementation of Feature K-...
Github user peedeeX21 commented on the pull request: https://github.com/apache/flink/pull/700#issuecomment-116850459 I am having some trouble to fit our predictor into the new API. The problem is, that with `PredictOperation` the type of the model has to be defined. A `DataSet` of this type is the output of the `getModel`. For the `predict` method the input is just an object of this type. In our case our model is a `DataSet` of `LabeledVectors` (the centroids). This means I can not implement a `PredictOperation` due to that restriction. For me the API feels a bit inconsistent in that case For now I implemented only an `PredictDataSetOperation`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-2066) Make delay between execution retries configurable
[ https://issues.apache.org/jira/browse/FLINK-2066?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-2066: - Assignee: Nuno Miguel Marques dos Santos Make delay between execution retries configurable - Key: FLINK-2066 URL: https://issues.apache.org/jira/browse/FLINK-2066 Project: Flink Issue Type: Improvement Components: Core Affects Versions: 0.9 Reporter: Stephan Ewen Assignee: Nuno Miguel Marques dos Santos Labels: starter Flink allows to specify a delay between execution retries. This helps to let some external failure causes fully manifest themselves before the restart is attempted. The delay is currently defined only system wide. We should add it to the {{ExecutionConfig}} of a job to allow per-job specification. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2282) Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1
[ https://issues.apache.org/jira/browse/FLINK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605297#comment-14605297 ] Stephan Ewen commented on FLINK-2282: - Minor versions are bug-fix versions, major versions are feature versions. Deprecating in minor releases violates semantic versioning (http://semver.org/) The code should have been deprecated in 0.10.0 and removed for 0.11.0 to follow proper standards. Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1 --- Key: FLINK-2282 URL: https://issues.apache.org/jira/browse/FLINK-2282 Project: Flink Issue Type: Task Components: Streaming Affects Versions: 0.9.1 Reporter: Gyula Fora Non-grouped reduces/fold/aggregations have been removed for DataStream in 0.10 snapshot, so these features should be deprecated for the current release, and some note should be added to the javadocs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2289) Make JobManager highly available
Till Rohrmann created FLINK-2289: Summary: Make JobManager highly available Key: FLINK-2289 URL: https://issues.apache.org/jira/browse/FLINK-2289 Project: Flink Issue Type: Improvement Reporter: Till Rohrmann Assignee: Till Rohrmann Currently, the {{JobManager}} is the single point of failure in the Flink system. If it fails, then your job cannot be recovered and the Flink cluster is no longer able to receive new jobs. Therefore, it is crucial to make the {{JobManager}} fault tolerant so that the Flink cluster can recover from failed {{JobManager}}. As a first step towards this goal, I propose to make the {{JobManager}} highly available by starting multiple instances and using Apache ZooKeeper to elect a leader. The leader is responsible for the execution of the Flink job. In case that the {{JobManager}} dies, one of the other running {{JobManager}} will be elected as the leader and take over the role of the leader. The {{Client}} and the {{TaskManager}} will automatically detect the new {{JobManager}} by querying the ZooKeeper cluster. Note that this does not achieve full fault tolerance for the {{JobManager}} but it allows the cluster to recover from failed {{JobManager}}. The design of high-availability for the {{JobManager}} is tracked in the wiki here [1]. Resources: [1] [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2283) Make grouped reduce/fold/aggregations stateful using Partitioned state
[ https://issues.apache.org/jira/browse/FLINK-2283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605298#comment-14605298 ] Stephan Ewen commented on FLINK-2283: - Are these again implemented in java maps, or are there any considerations about out-of-core and managed memory here? Make grouped reduce/fold/aggregations stateful using Partitioned state -- Key: FLINK-2283 URL: https://issues.apache.org/jira/browse/FLINK-2283 Project: Flink Issue Type: Improvement Components: Streaming Affects Versions: 0.10 Reporter: Gyula Fora Priority: Minor Currently the inner state of the grouped aggregations are not persisted as an operator state. These operators should be reimplemented to use the newly introduced partitioned state abstractions which will make them fault tolerant and scalable for the future. A suggested implementation would be to use a stateful mapper to implement the desired behaviour. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2282) Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1
[ https://issues.apache.org/jira/browse/FLINK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605330#comment-14605330 ] Stephan Ewen commented on FLINK-2282: - Since the API is labeled beta, it is okay to do breaking changes. Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1 --- Key: FLINK-2282 URL: https://issues.apache.org/jira/browse/FLINK-2282 Project: Flink Issue Type: Task Components: Streaming Affects Versions: 0.9.1 Reporter: Gyula Fora Non-grouped reduces/fold/aggregations have been removed for DataStream in 0.10 snapshot, so these features should be deprecated for the current release, and some note should be added to the javadocs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2284) Confusing/inconsistent PartitioningStrategy
Stephan Ewen created FLINK-2284: --- Summary: Confusing/inconsistent PartitioningStrategy Key: FLINK-2284 URL: https://issues.apache.org/jira/browse/FLINK-2284 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9 Reporter: Stephan Ewen The PartitioningStrategy in {{org.apache.flink.streaming.runtime.partitioner.StreamPartitioner.java}} is non standard and not easily understandable. What form of partitioning is `SHUFFLE`? Shuffle just means redistribute, it says nothing about what it does. Same with `DISTRIBUTE`. Also `GLOBAL` is not a well-defined/established term. Why is `GROUPBY` a partition type? Doesn't grouping simply hash partition (like I assume SHUFFLE means), so why does it have an extra entry? Sticking with principled and established names/concepts is important to allow people to collaborate on the code. Why not stick with the partitioning types defined in the batch API? They are well defined and named: ``` NONE, FORWARD, RANDOM, HASH, RANGE, FORCED_REBALANCE, BROADCAST, CUSTOM ``` -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-2282) Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1
[ https://issues.apache.org/jira/browse/FLINK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605297#comment-14605297 ] Stephan Ewen edited comment on FLINK-2282 at 6/29/15 8:48 AM: -- Minor versions are bug-fix versions, major versions are feature versions. Deprecating in minor releases violates semantic versioning (http://semver.org/) The code should have been deprecated in 0.10.0 and removed for 0.11.0 to follow proper standards. (UPDATE: I assume that 0.9, 0.10, ... are actually the equivalent of major versions here.) was (Author: stephanewen): Minor versions are bug-fix versions, major versions are feature versions. Deprecating in minor releases violates semantic versioning (http://semver.org/) The code should have been deprecated in 0.10.0 and removed for 0.11.0 to follow proper standards. Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1 --- Key: FLINK-2282 URL: https://issues.apache.org/jira/browse/FLINK-2282 Project: Flink Issue Type: Task Components: Streaming Affects Versions: 0.9.1 Reporter: Gyula Fora Non-grouped reduces/fold/aggregations have been removed for DataStream in 0.10 snapshot, so these features should be deprecated for the current release, and some note should be added to the javadocs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2286) Window ParallelMerge sometimes swallows elements of the last window
Márton Balassi created FLINK-2286: - Summary: Window ParallelMerge sometimes swallows elements of the last window Key: FLINK-2286 URL: https://issues.apache.org/jira/browse/FLINK-2286 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9, 0.10 Reporter: Márton Balassi Assignee: Márton Balassi Last windows in the stream that do not have parts at all the parallel operator instances get swallowed by the ParallelMerge. To resolve this ParallelMerge should be an operator instead of a function, so the close method can access the collector and emit these. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2289) Make JobManager highly available
[ https://issues.apache.org/jira/browse/FLINK-2289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605308#comment-14605308 ] Ufuk Celebi commented on FLINK-2289: Issue creation race... ;) Can you copy your text to FLINK-2287? Make JobManager highly available Key: FLINK-2289 URL: https://issues.apache.org/jira/browse/FLINK-2289 Project: Flink Issue Type: Improvement Reporter: Till Rohrmann Assignee: Till Rohrmann Currently, the {{JobManager}} is the single point of failure in the Flink system. If it fails, then your job cannot be recovered and the Flink cluster is no longer able to receive new jobs. Therefore, it is crucial to make the {{JobManager}} fault tolerant so that the Flink cluster can recover from failed {{JobManager}}. As a first step towards this goal, I propose to make the {{JobManager}} highly available by starting multiple instances and using Apache ZooKeeper to elect a leader. The leader is responsible for the execution of the Flink job. In case that the {{JobManager}} dies, one of the other running {{JobManager}} will be elected as the leader and take over the role of the leader. The {{Client}} and the {{TaskManager}} will automatically detect the new {{JobManager}} by querying the ZooKeeper cluster. Note that this does not achieve full fault tolerance for the {{JobManager}} but it allows the cluster to recover from failed {{JobManager}}. The design of high-availability for the {{JobManager}} is tracked in the wiki here [1]. Resources: [1] [https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (FLINK-2285) Active policy emits elements of the last window twice
[ https://issues.apache.org/jira/browse/FLINK-2285?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Márton Balassi reassigned FLINK-2285: - Assignee: Márton Balassi Active policy emits elements of the last window twice - Key: FLINK-2285 URL: https://issues.apache.org/jira/browse/FLINK-2285 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9, 0.10 Reporter: Márton Balassi Assignee: Márton Balassi The root cause is some duplicate code between the close methods of the GroupedActiveDiscretizer and the GroupedStreamDiscretizer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2285) Active policy emits elements of the last window twice
Márton Balassi created FLINK-2285: - Summary: Active policy emits elements of the last window twice Key: FLINK-2285 URL: https://issues.apache.org/jira/browse/FLINK-2285 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9, 0.10 Reporter: Márton Balassi The root cause is some duplicate code between the close methods of the GroupedActiveDiscretizer and the GroupedStreamDiscretizer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2284) Confusing/inconsistent PartitioningStrategy
[ https://issues.apache.org/jira/browse/FLINK-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605312#comment-14605312 ] Gyula Fora commented on FLINK-2284: --- We took these names from Storm which we saw at that point a standard for streaming concepts. Confusing/inconsistent PartitioningStrategy --- Key: FLINK-2284 URL: https://issues.apache.org/jira/browse/FLINK-2284 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.9 Reporter: Stephan Ewen The PartitioningStrategy in {{org.apache.flink.streaming.runtime.partitioner.StreamPartitioner.java}} is non standard and not easily understandable. What form of partitioning is `SHUFFLE`? Shuffle just means redistribute, it says nothing about what it does. Same with `DISTRIBUTE`. Also `GLOBAL` is not a well-defined/established term. Why is `GROUPBY` a partition type? Doesn't grouping simply hash partition (like I assume SHUFFLE means), so why does it have an extra entry? Sticking with principled and established names/concepts is important to allow people to collaborate on the code. Why not stick with the partitioning types defined in the batch API? They are well defined and named: ``` NONE, FORWARD, RANDOM, HASH, RANGE, FORCED_REBALANCE, BROADCAST, CUSTOM ``` -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-1735] Feature Hasher
Github user ChristophAl commented on the pull request: https://github.com/apache/flink/pull/665#issuecomment-116517528 Hi, after I rebased it on master and implemented the new pipeline interface, I have some followup questions regarding the types we should accept for feature hashing. I can think of Iterable[String] and Iterable[(String, Int)] for documents as well as (Int, Iterable[String]) and (Int, Iterable[(String, Int)]) for documents having some kind of index. So I'm not sure if it is required to hash arbitrary types by using .hashcode()? Also note, in case the nonNegative parameter is set to true, the output can be used as TF in TF-IDF. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2287) Implement JobManager high availability
Ufuk Celebi created FLINK-2287: -- Summary: Implement JobManager high availability Key: FLINK-2287 URL: https://issues.apache.org/jira/browse/FLINK-2287 Project: Flink Issue Type: Improvement Components: JobManager, TaskManager Reporter: Ufuk Celebi Fix For: 0.10 The problem: The JobManager (JM) is a single point of failure. When it crashes, TaskManagers (TM) fail all running jobs and try to reconnect to the same JM. A failed JM looses all state and can not resume the running jobs; even if it recovers and the TMs reconnect. Solution: implement JM fault tolerance/high availability by having multiple JM instances running with one as leader and the other(s) in standby. The exact coordination and state update protocol between JM, TM, and clients is covered in sub-tasks/issues. Related Wiki: https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2230] handling null values for TupleSer...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/867#issuecomment-116520207 @Shiti No need to apologize, that is all right. We are discussing a tricky thing here... It is true, right now, one needs to be aware of missing values in the application logic. I personally like that, an `Option` makes it very explicit and easy to grasp. Also the byte overhead for the option field is okay, as most fields will not be nullable and not carry this overhead. The serialization logic is also more CPU efficient that way. What is true is that there is a problem when converting from a `Table` to a `DataSet[Tuple]`. Tables with nullable fields could only be converted to data sets of class types, which would support nullable fields. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2138) PartitionCustom for streaming
[ https://issues.apache.org/jira/browse/FLINK-2138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605296#comment-14605296 ] ASF GitHub Bot commented on FLINK-2138: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/872#issuecomment-116528738 I actually like this approach. We had the same discussion for the batch API and resolved to this, because: - You can always chain a `FlatMapFunction` with a `partitionCustom()` request to solve all the above situations. - This interface allows easy Java8-lambda implementation and it works well with the type extraction. - It seems to cover the majority of cases more elegantly, as there is no need for array wrapping in the user code. PartitionCustom for streaming - Key: FLINK-2138 URL: https://issues.apache.org/jira/browse/FLINK-2138 Project: Flink Issue Type: New Feature Components: Streaming Affects Versions: 0.9 Reporter: Márton Balassi Assignee: Gábor Hermann Priority: Minor The batch API has support for custom partitioning, this should be added for streaming with a similar signature. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2281) Allow chaining of operators with multiple inputs
[ https://issues.apache.org/jira/browse/FLINK-2281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gyula Fora closed FLINK-2281. - Resolution: Later Allow chaining of operators with multiple inputs Key: FLINK-2281 URL: https://issues.apache.org/jira/browse/FLINK-2281 Project: Flink Issue Type: Improvement Components: Streaming Reporter: Gyula Fora Assignee: Gyula Fora Priority: Minor Currently only operators with one input can be chained for better performance. In principle there is no reason why this should not be extended to multiple input operators (where chaining is applicable, i.e forward connected), to allow maximal speedup. The concern usually arises that it might not be good to always chain everything in case of computation heavy operators. In these cases (if the job is otherwise viable) chaining only increases latency for some operations while the throughput is increased exploiting by increasing the parallelism to the freed resources. In any case chaining should still remain tunable on an operator level. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2281) Allow chaining of operators with multiple inputs
[ https://issues.apache.org/jira/browse/FLINK-2281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605323#comment-14605323 ] Gyula Fora commented on FLINK-2281: --- After a second thought, this will introduce a very large complexity to the record readers and also to the FT implementation, I am closing this issue for now. Allow chaining of operators with multiple inputs Key: FLINK-2281 URL: https://issues.apache.org/jira/browse/FLINK-2281 Project: Flink Issue Type: Improvement Components: Streaming Reporter: Gyula Fora Assignee: Gyula Fora Priority: Minor Currently only operators with one input can be chained for better performance. In principle there is no reason why this should not be extended to multiple input operators (where chaining is applicable, i.e forward connected), to allow maximal speedup. The concern usually arises that it might not be good to always chain everything in case of computation heavy operators. In these cases (if the job is otherwise viable) chaining only increases latency for some operations while the throughput is increased exploiting by increasing the parallelism to the freed resources. In any case chaining should still remain tunable on an operator level. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2281) Allow chaining of operators with multiple inputs
[ https://issues.apache.org/jira/browse/FLINK-2281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605261#comment-14605261 ] Stephan Ewen commented on FLINK-2281: - Nice idea, but probably not very critical. I would assume that most real-world connected inputs involve some repartition patterns anyways. Allow chaining of operators with multiple inputs Key: FLINK-2281 URL: https://issues.apache.org/jira/browse/FLINK-2281 Project: Flink Issue Type: Improvement Components: Streaming Reporter: Gyula Fora Assignee: Gyula Fora Priority: Minor Currently only operators with one input can be chained for better performance. In principle there is no reason why this should not be extended to multiple input operators (where chaining is applicable, i.e forward connected), to allow maximal speedup. The concern usually arises that it might not be good to always chain everything in case of computation heavy operators. In these cases (if the job is otherwise viable) chaining only increases latency for some operations while the throughput is increased exploiting by increasing the parallelism to the freed resources. In any case chaining should still remain tunable on an operator level. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1735) Add FeatureHasher to machine learning library
[ https://issues.apache.org/jira/browse/FLINK-1735?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605260#comment-14605260 ] ASF GitHub Bot commented on FLINK-1735: --- Github user ChristophAl commented on the pull request: https://github.com/apache/flink/pull/665#issuecomment-116517528 Hi, after I rebased it on master and implemented the new pipeline interface, I have some followup questions regarding the types we should accept for feature hashing. I can think of Iterable[String] and Iterable[(String, Int)] for documents as well as (Int, Iterable[String]) and (Int, Iterable[(String, Int)]) for documents having some kind of index. So I'm not sure if it is required to hash arbitrary types by using .hashcode()? Also note, in case the nonNegative parameter is set to true, the output can be used as TF in TF-IDF. Add FeatureHasher to machine learning library - Key: FLINK-1735 URL: https://issues.apache.org/jira/browse/FLINK-1735 Project: Flink Issue Type: New Feature Components: Machine Learning Library Reporter: Till Rohrmann Assignee: Felix Neutatz Labels: ML Using the hashing trick [1,2] is a common way to vectorize arbitrary feature values. The hash of the feature value is used to calculate its index for a vector entry. In order to mitigate possible collisions, a second hashing function is used to calculate the sign for the update value which is added to the vector entry. This way, it is likely that collision will simply cancel out. A feature hasher would also be helpful for NLP problems where it could be used to vectorize bag of words or ngrams feature vectors. Resources: [1] [https://en.wikipedia.org/wiki/Feature_hashing] [2] [http://scikit-learn.org/stable/modules/feature_extraction.html#feature-extraction] -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2288) Setup ZooKeeper for distributed coordination
Ufuk Celebi created FLINK-2288: -- Summary: Setup ZooKeeper for distributed coordination Key: FLINK-2288 URL: https://issues.apache.org/jira/browse/FLINK-2288 Project: Flink Issue Type: Sub-task Reporter: Ufuk Celebi Assignee: Ufuk Celebi Fix For: 0.10 Having standby JM instances for job manager high availabilty requires distributed coordination between JM, TM, and clients. For this, we will use ZooKeeper (ZK). Pros: - Proven solution (other projects use it for this as well) - Apache TLP with large community, docs, and library with required recipies like leader election (see below) Related Wiki: https://cwiki.apache.org/confluence/display/FLINK/JobManager+High+Availability -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2282) Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1
[ https://issues.apache.org/jira/browse/FLINK-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14605319#comment-14605319 ] Gyula Fora commented on FLINK-2282: --- You are right we should have thought about this before the release. I don't mind just removing them completely without notice, I was just suggesting that maybe it is now better to deprecate them. Deprecate non-grouped stream reduce/fold/aggregations for 0.9.1 --- Key: FLINK-2282 URL: https://issues.apache.org/jira/browse/FLINK-2282 Project: Flink Issue Type: Task Components: Streaming Affects Versions: 0.9.1 Reporter: Gyula Fora Non-grouped reduces/fold/aggregations have been removed for DataStream in 0.10 snapshot, so these features should be deprecated for the current release, and some note should be added to the javadocs. -- This message was sent by Atlassian JIRA (v6.3.4#6332)