[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user sachingoel0101 commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37141049 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/FieldStats.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import org.apache.flink.ml.statistics.FieldType._ + +import scala.collection.mutable + +/** Class to represent Field statistics. + * + * =Parameters= + * -[[fieldType]]: + * Type of this field, [[DISCRETE]] or [[CONTINUOUS]] + * + * For [[DISCRETE]] fields, [[entropy]], [[gini]] and [[categoryCounts]] are provided. + * For [[CONTINUOUS]] fields, [[min]], [[max]], [[mean]] and [[variance]] are provided. + * + */ +class FieldStats(val fieldType: FieldType) extends Serializable { + // field parameters + private [statistics] var _min: Double = _ + private [statistics] var _max: Double = _ + private [statistics] var _mean: Double = _ + private [statistics] var _variance: Double = _ + private [statistics] var _counts: mutable.HashMap[Double,Int] = _ + + // Access methods // + def min: Double = { +exception(DISCRETE) +_min + } + + def max: Double = { +exception(DISCRETE) +_max + } + + def mean: Double = { +exception(DISCRETE) +_mean + } + + def variance: Double = { +exception(DISCRETE) +_variance + } + + def categoryCounts: mutable.HashMap[Double,Int] = { +exception(CONTINUOUS) +_counts + } + + /** + * Returns the entropy value for this [[DISCRETE]] field. + */ + def entropy: Double = { +exception(CONTINUOUS) +val total: Double = _counts.valuesIterator.sum +3.322 * _counts.iterator.map(x = - (x._2 / total) * Math.log10(x._2 / total)).sum + } + + /** + * Returns the Gini impurity for this [[DISCRETE]] field. + */ + def gini: Double = { +exception(CONTINUOUS) +val total: Double = _counts.valuesIterator.sum +1 - _counts.iterator.map(x = (x._2 * x._2)).sum / (total * total) + } + + //-- Setter methods --// + private [statistics] def setContinuousParameters( --- End diff -- Gini impurity should be accessible to user even outside the package. It is a property of the field itself and we're not using it ourselves anyway. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2477) Add test for SocketClientSink
[ https://issues.apache.org/jira/browse/FLINK-2477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698510#comment-14698510 ] ASF GitHub Bot commented on FLINK-2477: --- Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/977#issuecomment-131473233 Ok, I`ve done it. Thank you! Add test for SocketClientSink - Key: FLINK-2477 URL: https://issues.apache.org/jira/browse/FLINK-2477 Project: Flink Issue Type: Test Components: Streaming Affects Versions: 0.10 Environment: win7 32bit;linux Reporter: Huang Wei Priority: Minor Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h Add some tests for SocketClientSink. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...
Github user HuangWHWHW commented on the pull request: https://github.com/apache/flink/pull/977#issuecomment-131473233 Ok, I`ve done it. Thank you! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/861#issuecomment-131474758 Hi @chiwanpark, thanks for reviewing this. This was my first time working in Scala, so I hope you'll forgive the slight mistakes (oversights, perhaps?). I've tried to address most of your comments and left notes where I was not sure. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2528) MatchTaskTest failure
Sachin Goel created FLINK-2528: -- Summary: MatchTaskTest failure Key: FLINK-2528 URL: https://issues.apache.org/jira/browse/FLINK-2528 Project: Flink Issue Type: Bug Reporter: Sachin Goel MatchTaskTest fails with a Null Pointer exception. Here's the log: https://s3.amazonaws.com/archive.travis-ci.org/jobs/75780253/log.txt and the relevant parts of trace: {code} Exception in thread Thread-154 java.lang.AssertionError: Canceling task failed: java.lang.NullPointerException at org.apache.flink.runtime.operators.testutils.DriverTestBase.cancel(DriverTestBase.java:271) at org.apache.flink.runtime.operators.testutils.TaskCancelThread.run(TaskCancelThread.java:60) at org.junit.Assert.fail(Assert.java:88) at org.apache.flink.runtime.operators.testutils.TaskCancelThread.run(TaskCancelThread.java:68) {code} {code} Thread-153 prio=10 tid=0x7fc1e1338800 nid=0x5cd6 waiting on condition [0x7fc1d2b1] java.lang.Thread.State: TIMED_WAITING (sleeping) at java.lang.Thread.sleep(Native Method) at org.apache.flink.runtime.operators.MatchTaskTest$MockDelayingMatchStub.join(MatchTaskTest.java:984) at org.apache.flink.runtime.operators.MatchTaskTest$MockDelayingMatchStub.join(MatchTaskTest.java:978) at org.apache.flink.runtime.operators.sort.AbstractMergeIterator.crossMwithNValues(AbstractMergeIterator.java:297) at org.apache.flink.runtime.operators.sort.AbstractMergeIterator.crossMatchingGroup(AbstractMergeIterator.java:146) at org.apache.flink.runtime.operators.sort.AbstractMergeInnerJoinIterator.callWithNextKey(AbstractMergeInnerJoinIterator.java:105) at org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:208) at org.apache.flink.runtime.operators.testutils.DriverTestBase.testDriverInternal(DriverTestBase.java:208) at org.apache.flink.runtime.operators.testutils.DriverTestBase.testDriver(DriverTestBase.java:174) at org.apache.flink.runtime.operators.MatchTaskTest$3.run(MatchTaskTest.java:520) {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2529) fix on some unused code for flink-runtime
Huang Wei created FLINK-2529: Summary: fix on some unused code for flink-runtime Key: FLINK-2529 URL: https://issues.apache.org/jira/browse/FLINK-2529 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 0.10 Reporter: Huang Wei Priority: Minor Fix For: 0.10 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user sachingoel0101 commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37141028 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/FieldStats.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import org.apache.flink.ml.statistics.FieldType._ + +import scala.collection.mutable + +/** Class to represent Field statistics. + * + * =Parameters= + * -[[fieldType]]: + * Type of this field, [[DISCRETE]] or [[CONTINUOUS]] + * + * For [[DISCRETE]] fields, [[entropy]], [[gini]] and [[categoryCounts]] are provided. + * For [[CONTINUOUS]] fields, [[min]], [[max]], [[mean]] and [[variance]] are provided. + * + */ +class FieldStats(val fieldType: FieldType) extends Serializable { + // field parameters + private [statistics] var _min: Double = _ + private [statistics] var _max: Double = _ + private [statistics] var _mean: Double = _ + private [statistics] var _variance: Double = _ + private [statistics] var _counts: mutable.HashMap[Double,Int] = _ + --- End diff -- Could you clarify this point? They're already package private fields. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user sachingoel0101 commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37141076 --- Diff: docs/libs/ml/statistics.md --- @@ -0,0 +1,108 @@ +--- +mathjax: include +htmlTitle: FlinkML - Statistics +title: a href=../mlFlinkML/a - Statistics +--- +!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +License); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +-- + +* This will be replaced by the TOC +{:toc} + +## Description + + The statistics utility provides features such as building histograms over data, determining + mean, variance, gini impurity, entropy etc. of data. + +## Methods + + The Statistics utility provides two major functions: `createHistogram` and `dataStats`. + +### Creating a histogram + + There are two types of histograms: + ul + li + strongContinuous Histograms/strong: These histograms are formed on a data set `X: DataSet[Double]` + when the values in `X` are from a continuous range. These histograms support + `quantile` and `sum` operations. Here `quantile(q)` refers to a value $x_q$ such that $|x: x + \leq x_q| = q * |X|$. Further, `sum(s)` refers to the number of elements $x \leq s$, which can +be construed as a cumulative probability value at $s$[Of course, iscaled/i probability]. + br +A continuous histogram can be formed by calling `X.createHistogram(b)` where `b` is the +number of bins. + /li + li +strongCategorical Histograms/strong: These histograms are formed on a data set `X:DataSet[Double]` +when the values in `X` are from a discrete distribution. These histograms +support `count(c)` operation which returns the number of elements associated with cateogry `c`. +br +A categorical histogram can be formed by calling `X.createHistogram(0)`. + /li + /ul + +### Data Statistics + + The `dataStats` function operates on a data set `X: DataSet[Vector]` and returns column-wise + statistics for `X`. Every field of `X` is allowed to be defined as either idiscrete/i or + icontinuous/i. + br + Statistics can be evaluated by calling `DataStats.dataStats(X)` or + `DataStats.dataStats(X, discreteFields`). The latter is used when some fields are needed to be + declared discrete-valued, and is provided as an array of indices of fields which are discrete. + br + The following information is available as part of `DataStats`: + ul +liNumber of elements in `X`/li +liDimension of `X`/li +liColumn-wise statistics where for discrete fields, we report counts for each category, and + the Gini impurity and Entropy of the field, while for continuous fields, we report the + minimum, maximum, mean and variance. +/li + /ul + +## Examples + +{% highlight scala %} + +import org.apache.flink.ml.statistics._ +import org.apache.flink.ml._ + +val X: DataSet[Double] = ... +// Create continuous histogram +val histogram = X.createHistogram(5) // creates a histogram with five bins +histogram.quantile(0.3) // returns the 30th quantile +histogram.sum(4) // returns number of elements less than 4 --- End diff -- You're right. I'm wondering if we should provide two methods for this purpose. Otherwise user will need to cast into the appropriate class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...
Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/1003#issuecomment-131477326 A stand-alone parameter server service will require setting up and tearing down of the client every time the user, say, opens and closes a Rich function while using it. Further, it means we have to add another dependency when the same could be accomplished using akka. In this implementation, the parameter *server* is at every task manager [effectively it acts as a client to serve all running tasks at one node. In fact, in this sense, there are no servers, just clients at every worker, which are managed by the Job Manager]. This in itself means lesser data transfer over the network, since every *server* will usually be owner of a key and can serve its *clients* faster instead of every request going over the network. Further, it is completely distributed, and every task manager maintains its own *server* and sets it up or tears it down along-with itself. As far as including it in the core itself is concerned, there isn't much of it. There are the 3-4 odd functions directly added in the Runtime context, which effectively serve as an interface. @tillrohrmann, could you weigh in here if this is the intended use of a PS in ML algorithms. I can easily see this working with, for example, the regression algorithm. The reasons I included it into the runtime is that, there will be no chances of failure now. If the TaskManager is alive, the Parameter Server at that client will be alive. Further, the Job Manager manages the servers and determines where each key will go [which will be crucial to recovery], something which can be very hard to determine in a completely de-centralized manner (I couldn't think of a full-proof way). This ensures that the server is running only on the workers where it's needed, and if it is needed. Keeping the Job Manager in the loop also ensures that recovery is easy. If a Task Manager fails, the Job Manager knows which server failed by matching the `InstanceID`s and can kick off the recovery process from the duplicate server. [This is not implemented yet.] A stand-alone PS will add another master-node system in parallel to the JobManager-TaskManager system, which can be efficiently used for this purpose. Of course, this doesn't matter if we use an external key-value store. I will have a look at #967 and see how the two can be integrated. I had a look at an open implementation done for Spark. https://github.com/apache/spark/compare/branch-1.3...chouqin:ps-on-spark-1.3 This adds a separate context and a function on RDD to access the PS and does require running a service inside the core environment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2529) fix on some unused code for flink-runtime
[ https://issues.apache.org/jira/browse/FLINK-2529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698529#comment-14698529 ] ASF GitHub Bot commented on FLINK-2529: --- GitHub user HuangWHWHW opened a pull request: https://github.com/apache/flink/pull/1022 [FLINK-2529][runtime]remove some unused code There are some reviews: 1.var consumerGraph is never used in public Boolean call() throws Exception. 2.In my learned knowledge, function Thread.currentThread() will never return null.So the code shutdownHook != null is unwanted. And i`m not complete sure.Maybe I`m wrong. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HuangWHWHW/flink FLINK-2529 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1022.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1022 commit f630fe9cf28ac734a472134d907e635693f00ad0 Author: HuangWHWHW 404823...@qq.com Date: 2015-08-16T03:56:18Z [FLINK-2529][runtime]remove some unused code fix on some unused code for flink-runtime - Key: FLINK-2529 URL: https://issues.apache.org/jira/browse/FLINK-2529 Project: Flink Issue Type: Improvement Components: Local Runtime Affects Versions: 0.10 Reporter: Huang Wei Priority: Minor Fix For: 0.10 Original Estimate: 168h Remaining Estimate: 168h -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2529][runtime]remove some unused code
GitHub user HuangWHWHW opened a pull request: https://github.com/apache/flink/pull/1022 [FLINK-2529][runtime]remove some unused code There are some reviews: 1.var consumerGraph is never used in public Boolean call() throws Exception. 2.In my learned knowledge, function Thread.currentThread() will never return null.So the code shutdownHook != null is unwanted. And i`m not complete sure.Maybe I`m wrong. You can merge this pull request into a Git repository by running: $ git pull https://github.com/HuangWHWHW/flink FLINK-2529 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1022.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1022 commit f630fe9cf28ac734a472134d907e635693f00ad0 Author: HuangWHWHW 404823...@qq.com Date: 2015-08-16T03:56:18Z [FLINK-2529][runtime]remove some unused code --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2478]fix the array may have out of boun...
GitHub user Rucongzhang opened a pull request: https://github.com/apache/flink/pull/1021 [FLINK-2478]fix the array may have out of bounds In getNestedDelta function, the array length of oldDatapoint and the array length of newDatapoint are notbeen ensured same.So add the judgement.And I fixed the problem. Please put this problem to master branch. Thank you. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Rucongzhang/flink FLINK-2487 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1021.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1021 commit e68c86f4ce8ae2d1e2bac8ab432fc93b5f52f756 Author: Rucongzhang zhangruc...@huawei.com Date: 2015-08-15T09:25:25Z [FLINK-2478]fix the array may have out of bounds --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2478) The page “FlinkML - Machine Learning for Flink“ https://ci.apache.org/projects/flink/flink-docs-master/libs/ml/ contains a dead link
[ https://issues.apache.org/jira/browse/FLINK-2478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698207#comment-14698207 ] ASF GitHub Bot commented on FLINK-2478: --- GitHub user Rucongzhang opened a pull request: https://github.com/apache/flink/pull/1021 [FLINK-2478]fix the array may have out of bounds In getNestedDelta function, the array length of oldDatapoint and the array length of newDatapoint are notbeen ensured same.So add the judgement.And I fixed the problem. Please put this problem to master branch. Thank you. You can merge this pull request into a Git repository by running: $ git pull https://github.com/Rucongzhang/flink FLINK-2487 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1021.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1021 commit e68c86f4ce8ae2d1e2bac8ab432fc93b5f52f756 Author: Rucongzhang zhangruc...@huawei.com Date: 2015-08-15T09:25:25Z [FLINK-2478]fix the array may have out of bounds The page “FlinkML - Machine Learning for Flink“ https://ci.apache.org/projects/flink/flink-docs-master/libs/ml/ contains a dead link - Key: FLINK-2478 URL: https://issues.apache.org/jira/browse/FLINK-2478 Project: Flink Issue Type: Task Components: Documentation Affects Versions: 0.10 Reporter: Slim Baltagi Assignee: Till Rohrmann Priority: Minor Note that FlinkML is currently not part of the binary distribution. See linking with it for cluster execution here. 'here' links to a dead link: https://ci.apache.org/projects/flink/flink-docs-master/libs/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution The correct link is: https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set
Gabor Gevay created FLINK-2527: -- Summary: If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set Key: FLINK-2527 URL: https://issues.apache.org/jira/browse/FLINK-2527 Project: Flink Issue Type: Bug Components: Gelly Reporter: Gabor Gevay Assignee: Gabor Gevay Fix For: 0.10, 0.9.1 The problem is that if setNewVertexValue is called more than once, it sends each new value to the out Collector, and these all end up in the workset, but then the coGroups in the two descendants of MessagingUdfWithEdgeValues use only the first value in the state Iterable. I see three ways to resolve this: 1. Add it to the documentation that setNewVertexValue should only be called once, and optionally add a check for this. 2. In setNewVertexValue, do not send the newValue to the out Collector at once, but only record it in outVal, and send the last recorded value after updateVertex returns. 3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup and MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need some documentation addition.) I like 2. the best. What are your opinions? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2526) Add catch{} for task when it stop running
[ https://issues.apache.org/jira/browse/FLINK-2526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698160#comment-14698160 ] ASF GitHub Bot commented on FLINK-2526: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/1020#discussion_r37134398 --- Diff: flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java --- @@ -100,9 +100,27 @@ public void invoke() throws Exception { this.isRunning = false; // Cleanup inputProcessor.clearBuffers(); - inputProcessor.cleanup(); - outputHandler.flushOutputs(); - clearBuffers(); + + try { + inputProcessor.cleanup(); + } + catch (Exception e) { + LOG.warn(Clean up input processor failed.); --- End diff -- the exception message shouldn't be discarded. Add catch{} for task when it stop running -- Key: FLINK-2526 URL: https://issues.apache.org/jira/browse/FLINK-2526 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.8.1 Reporter: fangfengbin Assignee: fangfengbin Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2526) Add catch{} for task when it stop running
[ https://issues.apache.org/jira/browse/FLINK-2526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698179#comment-14698179 ] ASF GitHub Bot commented on FLINK-2526: --- Github user ffbin commented on the pull request: https://github.com/apache/flink/pull/1020#issuecomment-131314931 @zentol Hello! Thank you for your suggestion.I have add exception message in log. Add catch{} for task when it stop running -- Key: FLINK-2526 URL: https://issues.apache.org/jira/browse/FLINK-2526 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.8.1 Reporter: fangfengbin Assignee: fangfengbin Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2526]Add try-catch for task when it sto...
Github user ffbin commented on the pull request: https://github.com/apache/flink/pull/1020#issuecomment-131314931 @zentol Hello! Thank you for your suggestion.I have add exception message in log. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-2526) Add catch{} for task when it stop running
fangfengbin created FLINK-2526: -- Summary: Add catch{} for task when it stop running Key: FLINK-2526 URL: https://issues.apache.org/jira/browse/FLINK-2526 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.8.1 Reporter: fangfengbin Assignee: fangfengbin Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: Some updates for programming_guide.md
GitHub user chenliang613 opened a pull request: https://github.com/apache/flink/pull/1019 Some updates for programming_guide.md Some updates for the section Program Skeleton of programming_guide.md You can merge this pull request into a Git repository by running: $ git pull https://github.com/chenliang613/flink patch-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1019.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1019 commit a0b17ead729511a856609a6ac80033d30f1dce71 Author: CHEN LIANG chenliang...@sina.cn Date: 2015-08-15T07:04:08Z Some updates for programming_guide.md Some updates for the section Program Skeleton of programming_guide.md --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2526]Add try-catch for task when it sto...
GitHub user ffbin opened a pull request: https://github.com/apache/flink/pull/1020 [FLINK-2526]Add try-catch for task when it stop running inputProcessor cleanup() may throw IOException.If do not catch it, the next line outputHandler .flushOutputs() will not work and cause output data loss.So i think add some try-catch is necessary. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ffbin/flink FLINK-2526 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1020.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1020 commit f846f56198c921f9285bce4fb25392d9d6e2a827 Author: ffbin 869218...@qq.com Date: 2015-08-15T06:40:16Z [FLINK-2526]Add catch{} for task when it stop running --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2526) Add catch{} for task when it stop running
[ https://issues.apache.org/jira/browse/FLINK-2526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698153#comment-14698153 ] ASF GitHub Bot commented on FLINK-2526: --- GitHub user ffbin opened a pull request: https://github.com/apache/flink/pull/1020 [FLINK-2526]Add try-catch for task when it stop running inputProcessor cleanup() may throw IOException.If do not catch it, the next line outputHandler .flushOutputs() will not work and cause output data loss.So i think add some try-catch is necessary. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ffbin/flink FLINK-2526 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/1020.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1020 commit f846f56198c921f9285bce4fb25392d9d6e2a827 Author: ffbin 869218...@qq.com Date: 2015-08-15T06:40:16Z [FLINK-2526]Add catch{} for task when it stop running Add catch{} for task when it stop running -- Key: FLINK-2526 URL: https://issues.apache.org/jira/browse/FLINK-2526 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.8.1 Reporter: fangfengbin Assignee: fangfengbin Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2334) IOException: Channel to path could not be opened
[ https://issues.apache.org/jira/browse/FLINK-2334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698362#comment-14698362 ] Stephan Ewen commented on FLINK-2334: - Is this problem actually reproducible, or was this a one time occurrence? IOException: Channel to path could not be opened Key: FLINK-2334 URL: https://issues.apache.org/jira/browse/FLINK-2334 Project: Flink Issue Type: Bug Affects Versions: 0.9 Environment: local and cluster environment; Linux and MacOS Reporter: David Heller Priority: Minor We've encountered an IOException due to missing temporary files (see stacktrace at the bottom). It occurred both in local and cluster execution and on MacOS as well as on linux. Data size does not seem to be the reason: the error occurred on a 6.5GB dataset as well as on a small 400MB dataset. Our code uses Bulk iterations and the suspicion is that cached build-side files are accidentally removed too early. As far as we observed it, the exception always happens in an iteration later than the first one (mostly iteration 2). Interestingly, on one occasion the error disappeared consistently when setting the number of maximum iterations higher (from 2 to 6). On another occasion, the exception appeared when adding a simple map operator at the end (holding the identity function). Generally, the error is quite hard to reproduce. The stacktrace: java.io.IOException: Channel to path '/var/folders/xx/0dd3w4jd7fbb4ytmhqxm157hgn/T/flink-io-f5061483-ff59-43dc-883f-79af813d5804/19a70637e025c7ee3919b30239060895.23.channel' could not be opened. at org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.init(AbstractFileIOChannel.java:61) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.init(AsynchronousFileIOChannel.java:86) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBulkBlockReader.init(AsynchronousBulkBlockReader.java:46) at org.apache.flink.runtime.io.disk.iomanager.AsynchronousBulkBlockReader.init(AsynchronousBulkBlockReader.java:39) at org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.createBulkBlockChannelReader(IOManagerAsync.java:263) at org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:751) at org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508) at org.apache.flink.runtime.operators.hash.ReOpenableMutableHashTable.prepareNextPartition(ReOpenableMutableHashTable.java:167) at org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:544) at org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104) at org.apache.flink.runtime.operators.AbstractCachedBuildSideMatchDriver.run(AbstractCachedBuildSideMatchDriver.java:155) at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) at org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139) at org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:92) at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) at java.lang.Thread.run(Thread.java:745) Caused by: java.io.FileNotFoundException: /var/folders/xx/0dd3w4jd7fbb4ytmhqxm157hgn/T/flink-io-f5061483-ff59-43dc-883f-79af813d5804/19a70637e025c7ee3919b30239060895.23.channel (No such file or directory) at java.io.RandomAccessFile.open0(Native Method) at java.io.RandomAccessFile.open(RandomAccessFile.java:316) at java.io.RandomAccessFile.init(RandomAccessFile.java:243) at java.io.RandomAccessFile.init(RandomAccessFile.java:124) at org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.init(AbstractFileIOChannel.java:57) ... 16 more -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2478) The page “FlinkML - Machine Learning for Flink“ https://ci.apache.org/projects/flink/flink-docs-master/libs/ml/ contains a dead link
[ https://issues.apache.org/jira/browse/FLINK-2478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698363#comment-14698363 ] ASF GitHub Bot commented on FLINK-2478: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1021#issuecomment-131402990 If the two arrays are of different lengths, should this throw an exception? After all, the cosine distance is not well defined then and returning 0 may give the impression that things are correct, when they are not. The page “FlinkML - Machine Learning for Flink“ https://ci.apache.org/projects/flink/flink-docs-master/libs/ml/ contains a dead link - Key: FLINK-2478 URL: https://issues.apache.org/jira/browse/FLINK-2478 Project: Flink Issue Type: Task Components: Documentation Affects Versions: 0.10 Reporter: Slim Baltagi Assignee: Till Rohrmann Priority: Minor Note that FlinkML is currently not part of the binary distribution. See linking with it for cluster execution here. 'here' links to a dead link: https://ci.apache.org/projects/flink/flink-docs-master/libs/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution The correct link is: https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2478]fix the array may have out of boun...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1021#issuecomment-131402990 If the two arrays are of different lengths, should this throw an exception? After all, the cosine distance is not well defined then and returning 0 may give the impression that things are correct, when they are not. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Some updates for programming_guide.md
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1019#issuecomment-131403119 Most of this is good, but in parts, it confuses the `DataSet` and `DataStream` API. The `DataSet` API has no functionality to write to sockets. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Stale Synchronous Parallel Iterations
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/967#issuecomment-131403208 @nltran How would your use of the parameter server work together with what is proposed in #1003 ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1003#issuecomment-131403660 This is again baked into the Flink runtime. Is there a way to keep this separate? I am still a bit puzzled if this is the pattern with which people use parameter servers: embedded in the workers that operate on the training data. My impression so far was that the parameter servers are a separate component, usually running on different machines. Also, could you comment on how this could work with the changes proposes in #967 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37137570 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala --- @@ -119,4 +120,30 @@ object MLUtils { stringRepresentation.writeAsText(filePath) } + + /** Create a [[DataSet]] of [[OnlineHistogram]] from the input data +* +* @param bins Number of bins required. Zero for [[CategoricalHistogram]] +* @param data input [[DataSet]] of [[Double]] +* @return [[DataSet]] of [[OnlineHistogram]] +*/ + private [ml] def createHistogram(data: DataSet[Double], bins: Int): DataSet[OnlineHistogram] = { +val min = data.map(x = x).reduce((x,y) = Math.min(x,y)) +val max = data.map(x = x).reduce((x,y) = Math.max(x,y)) --- End diff -- We don't need `map(x = x)` operation and need space between `x,` and `y`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37137605 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/CategoricalHistogram.scala --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import scala.collection.mutable + +/** Implementation of a discrete valued online histogram + * + * =Parameters= + * -[[numCategories]]: + * Number of categories in the histogram + */ +case class CategoricalHistogram(numCategories: Int) extends OnlineHistogram { + + require(numCategories 0, Capacity must be greater than zero) + val data = new mutable.HashMap[Double, Int]() + + /** Number of categories in the histogram +* +* @return number of categories +*/ + override def bins: Int = { +numCategories + } + + /** Increment count of category c +* +* @param c category whose count needs to be incremented +*/ + override def add(c: Double): Unit = { +if (data.get(c).isEmpty) { + require(data.size numCategories, Insufficient capacity. Failed to add.) + data.put(c, 1) +} else { + data.update(c, data.get(c).get + 1) +} + } + + /** Merges the histogram with h and returns a new histogram +* +* @param h histogram to be merged +* @param B number of categories in the resultant histogram. +* (Default: ```0```, number of categories will be the size of union of categories in +* both histograms) +* @return Merged histogram with capacity B +*/ + override def merge(h: OnlineHistogram, B: Int = 0): CategoricalHistogram = { +h match { + case h1: CategoricalHistogram = { +val finalMap = new mutable.HashMap[Double, Int]() +data.iterator.foreach(x = finalMap.put(x._1, x._2)) +h1.data.iterator.foreach(x = { + if (finalMap.get(x._1).isEmpty) { +finalMap.put(x._1, x._2) + } else { +finalMap.update(x._1, x._2 + finalMap.get(x._1).get) + } +}) --- End diff -- As I said above, we can reduce calling `get(x._1)` by using pattern matching for `finalMap.get(x._1)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37137592 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/CategoricalHistogram.scala --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import scala.collection.mutable + +/** Implementation of a discrete valued online histogram + * + * =Parameters= + * -[[numCategories]]: + * Number of categories in the histogram + */ +case class CategoricalHistogram(numCategories: Int) extends OnlineHistogram { + + require(numCategories 0, Capacity must be greater than zero) + val data = new mutable.HashMap[Double, Int]() + + /** Number of categories in the histogram +* +* @return number of categories +*/ + override def bins: Int = { +numCategories + } + + /** Increment count of category c +* +* @param c category whose count needs to be incremented +*/ + override def add(c: Double): Unit = { +if (data.get(c).isEmpty) { + require(data.size numCategories, Insufficient capacity. Failed to add.) + data.put(c, 1) +} else { + data.update(c, data.get(c).get + 1) +} --- End diff -- If use pattern matching for `data.get(c)`, we can reduce calling `data.get(c)`. For example: ```scala data.get(c) match { case None = require(data.size numCategories, Insufficient capacity. Failed to add.) data.put(c, 1) case Some(v) = data.update(c, v + 1) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: Some updates for programming_guide.md
Github user chenliang613 commented on the pull request: https://github.com/apache/flink/pull/1019#issuecomment-131405427 ok, update the sockets part. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37137631 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/CategoricalHistogram.scala --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import scala.collection.mutable + +/** Implementation of a discrete valued online histogram + * + * =Parameters= + * -[[numCategories]]: + * Number of categories in the histogram + */ +case class CategoricalHistogram(numCategories: Int) extends OnlineHistogram { + + require(numCategories 0, Capacity must be greater than zero) + val data = new mutable.HashMap[Double, Int]() + + /** Number of categories in the histogram +* +* @return number of categories +*/ + override def bins: Int = { +numCategories + } + + /** Increment count of category c +* +* @param c category whose count needs to be incremented +*/ + override def add(c: Double): Unit = { +if (data.get(c).isEmpty) { + require(data.size numCategories, Insufficient capacity. Failed to add.) + data.put(c, 1) +} else { + data.update(c, data.get(c).get + 1) +} + } + + /** Merges the histogram with h and returns a new histogram +* +* @param h histogram to be merged +* @param B number of categories in the resultant histogram. +* (Default: ```0```, number of categories will be the size of union of categories in +* both histograms) +* @return Merged histogram with capacity B +*/ + override def merge(h: OnlineHistogram, B: Int = 0): CategoricalHistogram = { +h match { + case h1: CategoricalHistogram = { +val finalMap = new mutable.HashMap[Double, Int]() +data.iterator.foreach(x = finalMap.put(x._1, x._2)) +h1.data.iterator.foreach(x = { + if (finalMap.get(x._1).isEmpty) { +finalMap.put(x._1, x._2) + } else { +finalMap.update(x._1, x._2 + finalMap.get(x._1).get) + } +}) +require(B == 0 || finalMap.size = B, Insufficient capacity. Failed to merge) +val countBuffer = new mutable.ArrayBuffer[(Double, Int)]() +finalMap.iterator.foreach(x = countBuffer += ((x._1, x._2))) +var finalSize = finalMap.size +if (B 0) { + finalSize = B +} +val ret = new CategoricalHistogram(finalSize) +ret.loadData(countBuffer.toArray) --- End diff -- Why the data in `finalMap` should be copied to `countBuffer`? ```scala val finalSize = if (B 0) B else finalMap.size val ret = new CategoricalHistogram(finalSize) ret.loadData(finalMap.toArray) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37137647 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/CategoricalHistogram.scala --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import scala.collection.mutable + +/** Implementation of a discrete valued online histogram + * + * =Parameters= + * -[[numCategories]]: + * Number of categories in the histogram + */ +case class CategoricalHistogram(numCategories: Int) extends OnlineHistogram { + + require(numCategories 0, Capacity must be greater than zero) + val data = new mutable.HashMap[Double, Int]() + + /** Number of categories in the histogram +* +* @return number of categories +*/ + override def bins: Int = { +numCategories + } + + /** Increment count of category c +* +* @param c category whose count needs to be incremented +*/ + override def add(c: Double): Unit = { +if (data.get(c).isEmpty) { + require(data.size numCategories, Insufficient capacity. Failed to add.) + data.put(c, 1) +} else { + data.update(c, data.get(c).get + 1) +} + } + + /** Merges the histogram with h and returns a new histogram +* +* @param h histogram to be merged +* @param B number of categories in the resultant histogram. +* (Default: ```0```, number of categories will be the size of union of categories in +* both histograms) +* @return Merged histogram with capacity B +*/ + override def merge(h: OnlineHistogram, B: Int = 0): CategoricalHistogram = { +h match { + case h1: CategoricalHistogram = { +val finalMap = new mutable.HashMap[Double, Int]() +data.iterator.foreach(x = finalMap.put(x._1, x._2)) +h1.data.iterator.foreach(x = { + if (finalMap.get(x._1).isEmpty) { +finalMap.put(x._1, x._2) + } else { +finalMap.update(x._1, x._2 + finalMap.get(x._1).get) + } +}) +require(B == 0 || finalMap.size = B, Insufficient capacity. Failed to merge) +val countBuffer = new mutable.ArrayBuffer[(Double, Int)]() +finalMap.iterator.foreach(x = countBuffer += ((x._1, x._2))) +var finalSize = finalMap.size +if (B 0) { + finalSize = B +} +val ret = new CategoricalHistogram(finalSize) +ret.loadData(countBuffer.toArray) +ret + } + case default = +throw new RuntimeException(Only a categorical histogram is allowed to be merged with a + + categorical histogram) +} + } + + /** Number of elements in category c +* +* @return Number of points in category c +*/ + def count(c: Double): Int = { +if (data.get(c).isEmpty) { + return 0 +} +data.get(c).get + } --- End diff -- I think following is better than current implementation. ```scala def count(c: Double): Int = data.get(c) match { case None = 0 case Some(v) = v } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37137658 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/CategoricalHistogram.scala --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import scala.collection.mutable + +/** Implementation of a discrete valued online histogram + * + * =Parameters= + * -[[numCategories]]: + * Number of categories in the histogram + */ +case class CategoricalHistogram(numCategories: Int) extends OnlineHistogram { + + require(numCategories 0, Capacity must be greater than zero) + val data = new mutable.HashMap[Double, Int]() + + /** Number of categories in the histogram +* +* @return number of categories +*/ + override def bins: Int = { +numCategories + } + + /** Increment count of category c +* +* @param c category whose count needs to be incremented +*/ + override def add(c: Double): Unit = { +if (data.get(c).isEmpty) { + require(data.size numCategories, Insufficient capacity. Failed to add.) + data.put(c, 1) +} else { + data.update(c, data.get(c).get + 1) +} + } + + /** Merges the histogram with h and returns a new histogram +* +* @param h histogram to be merged +* @param B number of categories in the resultant histogram. +* (Default: ```0```, number of categories will be the size of union of categories in +* both histograms) +* @return Merged histogram with capacity B +*/ + override def merge(h: OnlineHistogram, B: Int = 0): CategoricalHistogram = { +h match { + case h1: CategoricalHistogram = { +val finalMap = new mutable.HashMap[Double, Int]() +data.iterator.foreach(x = finalMap.put(x._1, x._2)) +h1.data.iterator.foreach(x = { + if (finalMap.get(x._1).isEmpty) { +finalMap.put(x._1, x._2) + } else { +finalMap.update(x._1, x._2 + finalMap.get(x._1).get) + } +}) +require(B == 0 || finalMap.size = B, Insufficient capacity. Failed to merge) +val countBuffer = new mutable.ArrayBuffer[(Double, Int)]() +finalMap.iterator.foreach(x = countBuffer += ((x._1, x._2))) +var finalSize = finalMap.size +if (B 0) { + finalSize = B +} +val ret = new CategoricalHistogram(finalSize) +ret.loadData(countBuffer.toArray) +ret + } + case default = +throw new RuntimeException(Only a categorical histogram is allowed to be merged with a + + categorical histogram) +} + } + + /** Number of elements in category c +* +* @return Number of points in category c +*/ + def count(c: Double): Int = { +if (data.get(c).isEmpty) { + return 0 +} +data.get(c).get + } + + /** Returns the total number of elements in the histogram +* +* @return total number of elements added so far +*/ + override def total: Int = { +var result = 0 +data.valuesIterator.foreach(x = result += x) +result + } --- End diff -- We can implement `total` method without mutable variable. ```scala override def total: Int = data.valuesIterator.sum ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37137718 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/ContinuousHistogram.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import scala.Double.MaxValue +import scala.collection.mutable + +/** Implementation of a continuous valued online histogram + * Adapted from Ben-Haim and Yom-Tov's Streaming Decision Tree Algorithm + * Refer http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf + * + * =Parameters= + * -[[capacity]]: + * Number of bins to be used in the histogram + * + * -[[min]]: + * Lower limit on the elements + * + * -[[max]]: + * Upper limit on the elements + */ +case class ContinuousHistogram(capacity: Int, min: Double, max: Double) extends OnlineHistogram { + + require(capacity 0, Capacity should be a positive integer) + require(lower upper, Lower must be less than upper) + + val data = new mutable.ArrayBuffer[(Double, Int)]() + + /** Adds a new item to the histogram +* +* @param p value to be added +*/ + override def add(p: Double): Unit = { +require(p lower p upper, p + not in ( + lower + , + upper + )) +// search for the index where the value is just higher than p +val search = find(p) +// add the new value there, shifting everything to the right +data.insert(search, (p, 1)) +// If we're over capacity or any two elements are within 1e-9 of each other, merge. +// This will take care of the case if p was actually equal to some value in the histogram and +// just increment the value there +mergeElements() + } + + /** Merges the histogram with h and returns a histogram with capacity B +* +* @param h histogram to be merged +* @param B capacity of the resultant histogram +* @return Merged histogram with capacity B +*/ + override def merge(h: OnlineHistogram, B: Int): ContinuousHistogram = { +h match { + case temp: ContinuousHistogram = { +val m: Int = bins +val n: Int = temp.bins +var i, j: Int = 0 +val mergeList = new mutable.ArrayBuffer[(Double, Int)]() +while (i m || j n) { + if (i = m) { +mergeList += ((temp.getValue(j), temp.getCounter(j))) +j = j + 1 + } else if (j = n || getValue(i) = temp.getValue(j)) { +mergeList += (data.apply(i)) +i = i + 1 + } else { +mergeList += ((temp.getValue(j), temp.getCounter(j))) +j = j + 1 + } +} +// the size will be brought to capacity while constructing the new histogram itself +val finalLower = Math.min(lower, temp.lower) +val finalUpper = Math.max(upper, temp.upper) +val ret = new ContinuousHistogram(B, finalLower, finalUpper) +ret.loadData(mergeList.toArray) +ret + } + case default = +throw new RuntimeException(Only a continuous histogram is allowed to be merged with a + + continuous histogram) + +} + } + + /** Returns the qth quantile of the histogram +* +* @param q Quantile value in (0,1) +* @return Value at quantile q +*/ + def quantile(q: Double): Double = { +require(bins 0, Histogram is empty) +require(q 0 q 1, Quantile must be between 0 and 1) +var total = 0 +for (i - 0 to bins - 1) { + total = total + getCounter(i) +} +val wantedSum = (q * total).round.toInt +var currSum = count(getValue(0)) + +if (wantedSum currSum) { + require(lower -MaxValue, Set a lower bound before
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37137756 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/ContinuousHistogram.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import scala.Double.MaxValue +import scala.collection.mutable + +/** Implementation of a continuous valued online histogram + * Adapted from Ben-Haim and Yom-Tov's Streaming Decision Tree Algorithm + * Refer http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf + * + * =Parameters= + * -[[capacity]]: + * Number of bins to be used in the histogram + * + * -[[min]]: + * Lower limit on the elements + * + * -[[max]]: + * Upper limit on the elements + */ +case class ContinuousHistogram(capacity: Int, min: Double, max: Double) extends OnlineHistogram { + + require(capacity 0, Capacity should be a positive integer) + require(lower upper, Lower must be less than upper) + + val data = new mutable.ArrayBuffer[(Double, Int)]() + + /** Adds a new item to the histogram +* +* @param p value to be added +*/ + override def add(p: Double): Unit = { +require(p lower p upper, p + not in ( + lower + , + upper + )) +// search for the index where the value is just higher than p +val search = find(p) +// add the new value there, shifting everything to the right +data.insert(search, (p, 1)) +// If we're over capacity or any two elements are within 1e-9 of each other, merge. +// This will take care of the case if p was actually equal to some value in the histogram and +// just increment the value there +mergeElements() + } + + /** Merges the histogram with h and returns a histogram with capacity B +* +* @param h histogram to be merged +* @param B capacity of the resultant histogram +* @return Merged histogram with capacity B +*/ + override def merge(h: OnlineHistogram, B: Int): ContinuousHistogram = { +h match { + case temp: ContinuousHistogram = { +val m: Int = bins +val n: Int = temp.bins +var i, j: Int = 0 +val mergeList = new mutable.ArrayBuffer[(Double, Int)]() +while (i m || j n) { + if (i = m) { +mergeList += ((temp.getValue(j), temp.getCounter(j))) +j = j + 1 + } else if (j = n || getValue(i) = temp.getValue(j)) { +mergeList += (data.apply(i)) +i = i + 1 + } else { +mergeList += ((temp.getValue(j), temp.getCounter(j))) +j = j + 1 + } +} +// the size will be brought to capacity while constructing the new histogram itself +val finalLower = Math.min(lower, temp.lower) +val finalUpper = Math.max(upper, temp.upper) +val ret = new ContinuousHistogram(B, finalLower, finalUpper) +ret.loadData(mergeList.toArray) +ret + } + case default = +throw new RuntimeException(Only a continuous histogram is allowed to be merged with a + + continuous histogram) + +} + } + + /** Returns the qth quantile of the histogram +* +* @param q Quantile value in (0,1) +* @return Value at quantile q +*/ + def quantile(q: Double): Double = { +require(bins 0, Histogram is empty) +require(q 0 q 1, Quantile must be between 0 and 1) +var total = 0 +for (i - 0 to bins - 1) { + total = total + getCounter(i) +} +val wantedSum = (q * total).round.toInt +var currSum = count(getValue(0)) + +if (wantedSum currSum) { + require(lower -MaxValue, Set a lower bound before
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37137749 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/ContinuousHistogram.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import scala.Double.MaxValue +import scala.collection.mutable + +/** Implementation of a continuous valued online histogram + * Adapted from Ben-Haim and Yom-Tov's Streaming Decision Tree Algorithm + * Refer http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf + * + * =Parameters= + * -[[capacity]]: + * Number of bins to be used in the histogram + * + * -[[min]]: + * Lower limit on the elements + * + * -[[max]]: + * Upper limit on the elements + */ +case class ContinuousHistogram(capacity: Int, min: Double, max: Double) extends OnlineHistogram { + + require(capacity 0, Capacity should be a positive integer) + require(lower upper, Lower must be less than upper) + + val data = new mutable.ArrayBuffer[(Double, Int)]() + + /** Adds a new item to the histogram +* +* @param p value to be added +*/ + override def add(p: Double): Unit = { +require(p lower p upper, p + not in ( + lower + , + upper + )) +// search for the index where the value is just higher than p +val search = find(p) +// add the new value there, shifting everything to the right +data.insert(search, (p, 1)) +// If we're over capacity or any two elements are within 1e-9 of each other, merge. +// This will take care of the case if p was actually equal to some value in the histogram and +// just increment the value there +mergeElements() + } + + /** Merges the histogram with h and returns a histogram with capacity B +* +* @param h histogram to be merged +* @param B capacity of the resultant histogram +* @return Merged histogram with capacity B +*/ + override def merge(h: OnlineHistogram, B: Int): ContinuousHistogram = { +h match { + case temp: ContinuousHistogram = { +val m: Int = bins +val n: Int = temp.bins +var i, j: Int = 0 +val mergeList = new mutable.ArrayBuffer[(Double, Int)]() +while (i m || j n) { + if (i = m) { +mergeList += ((temp.getValue(j), temp.getCounter(j))) +j = j + 1 + } else if (j = n || getValue(i) = temp.getValue(j)) { +mergeList += (data.apply(i)) +i = i + 1 + } else { +mergeList += ((temp.getValue(j), temp.getCounter(j))) +j = j + 1 + } +} +// the size will be brought to capacity while constructing the new histogram itself +val finalLower = Math.min(lower, temp.lower) +val finalUpper = Math.max(upper, temp.upper) +val ret = new ContinuousHistogram(B, finalLower, finalUpper) +ret.loadData(mergeList.toArray) +ret + } + case default = +throw new RuntimeException(Only a continuous histogram is allowed to be merged with a + + continuous histogram) + +} + } + + /** Returns the qth quantile of the histogram +* +* @param q Quantile value in (0,1) +* @return Value at quantile q +*/ + def quantile(q: Double): Double = { +require(bins 0, Histogram is empty) +require(q 0 q 1, Quantile must be between 0 and 1) +var total = 0 +for (i - 0 to bins - 1) { + total = total + getCounter(i) +} +val wantedSum = (q * total).round.toInt +var currSum = count(getValue(0)) + +if (wantedSum currSum) { + require(lower -MaxValue, Set a lower bound before
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37137752 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/ContinuousHistogram.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import scala.Double.MaxValue +import scala.collection.mutable + +/** Implementation of a continuous valued online histogram + * Adapted from Ben-Haim and Yom-Tov's Streaming Decision Tree Algorithm + * Refer http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf + * + * =Parameters= + * -[[capacity]]: + * Number of bins to be used in the histogram + * + * -[[min]]: + * Lower limit on the elements + * + * -[[max]]: + * Upper limit on the elements + */ +case class ContinuousHistogram(capacity: Int, min: Double, max: Double) extends OnlineHistogram { + + require(capacity 0, Capacity should be a positive integer) + require(lower upper, Lower must be less than upper) + + val data = new mutable.ArrayBuffer[(Double, Int)]() + + /** Adds a new item to the histogram +* +* @param p value to be added +*/ + override def add(p: Double): Unit = { +require(p lower p upper, p + not in ( + lower + , + upper + )) +// search for the index where the value is just higher than p +val search = find(p) +// add the new value there, shifting everything to the right +data.insert(search, (p, 1)) +// If we're over capacity or any two elements are within 1e-9 of each other, merge. +// This will take care of the case if p was actually equal to some value in the histogram and +// just increment the value there +mergeElements() + } + + /** Merges the histogram with h and returns a histogram with capacity B +* +* @param h histogram to be merged +* @param B capacity of the resultant histogram +* @return Merged histogram with capacity B +*/ + override def merge(h: OnlineHistogram, B: Int): ContinuousHistogram = { +h match { + case temp: ContinuousHistogram = { +val m: Int = bins +val n: Int = temp.bins +var i, j: Int = 0 +val mergeList = new mutable.ArrayBuffer[(Double, Int)]() +while (i m || j n) { + if (i = m) { +mergeList += ((temp.getValue(j), temp.getCounter(j))) +j = j + 1 + } else if (j = n || getValue(i) = temp.getValue(j)) { +mergeList += (data.apply(i)) +i = i + 1 + } else { +mergeList += ((temp.getValue(j), temp.getCounter(j))) +j = j + 1 + } +} +// the size will be brought to capacity while constructing the new histogram itself +val finalLower = Math.min(lower, temp.lower) +val finalUpper = Math.max(upper, temp.upper) +val ret = new ContinuousHistogram(B, finalLower, finalUpper) +ret.loadData(mergeList.toArray) +ret + } + case default = +throw new RuntimeException(Only a continuous histogram is allowed to be merged with a + + continuous histogram) + +} + } + + /** Returns the qth quantile of the histogram +* +* @param q Quantile value in (0,1) +* @return Value at quantile q +*/ + def quantile(q: Double): Double = { +require(bins 0, Histogram is empty) +require(q 0 q 1, Quantile must be between 0 and 1) +var total = 0 +for (i - 0 to bins - 1) { + total = total + getCounter(i) +} +val wantedSum = (q * total).round.toInt +var currSum = count(getValue(0)) + +if (wantedSum currSum) { + require(lower -MaxValue, Set a lower bound before
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37137762 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/ContinuousHistogram.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import scala.Double.MaxValue +import scala.collection.mutable + +/** Implementation of a continuous valued online histogram + * Adapted from Ben-Haim and Yom-Tov's Streaming Decision Tree Algorithm + * Refer http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf + * + * =Parameters= + * -[[capacity]]: + * Number of bins to be used in the histogram + * + * -[[min]]: + * Lower limit on the elements + * + * -[[max]]: + * Upper limit on the elements + */ +case class ContinuousHistogram(capacity: Int, min: Double, max: Double) extends OnlineHistogram { + + require(capacity 0, Capacity should be a positive integer) + require(lower upper, Lower must be less than upper) + + val data = new mutable.ArrayBuffer[(Double, Int)]() + + /** Adds a new item to the histogram +* +* @param p value to be added +*/ + override def add(p: Double): Unit = { +require(p lower p upper, p + not in ( + lower + , + upper + )) +// search for the index where the value is just higher than p +val search = find(p) +// add the new value there, shifting everything to the right +data.insert(search, (p, 1)) +// If we're over capacity or any two elements are within 1e-9 of each other, merge. +// This will take care of the case if p was actually equal to some value in the histogram and +// just increment the value there +mergeElements() + } + + /** Merges the histogram with h and returns a histogram with capacity B +* +* @param h histogram to be merged +* @param B capacity of the resultant histogram +* @return Merged histogram with capacity B +*/ + override def merge(h: OnlineHistogram, B: Int): ContinuousHistogram = { +h match { + case temp: ContinuousHistogram = { +val m: Int = bins +val n: Int = temp.bins +var i, j: Int = 0 +val mergeList = new mutable.ArrayBuffer[(Double, Int)]() +while (i m || j n) { + if (i = m) { +mergeList += ((temp.getValue(j), temp.getCounter(j))) +j = j + 1 + } else if (j = n || getValue(i) = temp.getValue(j)) { +mergeList += (data.apply(i)) +i = i + 1 + } else { +mergeList += ((temp.getValue(j), temp.getCounter(j))) +j = j + 1 + } +} +// the size will be brought to capacity while constructing the new histogram itself +val finalLower = Math.min(lower, temp.lower) +val finalUpper = Math.max(upper, temp.upper) +val ret = new ContinuousHistogram(B, finalLower, finalUpper) +ret.loadData(mergeList.toArray) +ret + } + case default = +throw new RuntimeException(Only a continuous histogram is allowed to be merged with a + + continuous histogram) + +} + } + + /** Returns the qth quantile of the histogram +* +* @param q Quantile value in (0,1) +* @return Value at quantile q +*/ + def quantile(q: Double): Double = { +require(bins 0, Histogram is empty) +require(q 0 q 1, Quantile must be between 0 and 1) +var total = 0 +for (i - 0 to bins - 1) { + total = total + getCounter(i) +} +val wantedSum = (q * total).round.toInt +var currSum = count(getValue(0)) + +if (wantedSum currSum) { + require(lower -MaxValue, Set a lower bound before
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37137774 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/FieldStats.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import org.apache.flink.ml.statistics.FieldType._ + +import scala.collection.mutable + +/** Class to represent Field statistics. + * + * =Parameters= + * -[[fieldType]]: + * Type of this field, [[DISCRETE]] or [[CONTINUOUS]] + * + * For [[DISCRETE]] fields, [[entropy]], [[gini]] and [[categoryCounts]] are provided. + * For [[CONTINUOUS]] fields, [[min]], [[max]], [[mean]] and [[variance]] are provided. + * + */ +class FieldStats(val fieldType: FieldType) extends Serializable { + // field parameters + private [statistics] var _min: Double = _ + private [statistics] var _max: Double = _ + private [statistics] var _mean: Double = _ + private [statistics] var _variance: Double = _ + private [statistics] var _counts: mutable.HashMap[Double,Int] = _ + --- End diff -- Just for cosmetic, `private[statistics]` would be better to match current coding style. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37137811 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/FieldStats.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import org.apache.flink.ml.statistics.FieldType._ + +import scala.collection.mutable + +/** Class to represent Field statistics. + * + * =Parameters= + * -[[fieldType]]: + * Type of this field, [[DISCRETE]] or [[CONTINUOUS]] + * + * For [[DISCRETE]] fields, [[entropy]], [[gini]] and [[categoryCounts]] are provided. + * For [[CONTINUOUS]] fields, [[min]], [[max]], [[mean]] and [[variance]] are provided. + * + */ +class FieldStats(val fieldType: FieldType) extends Serializable { + // field parameters + private [statistics] var _min: Double = _ + private [statistics] var _max: Double = _ + private [statistics] var _mean: Double = _ + private [statistics] var _variance: Double = _ + private [statistics] var _counts: mutable.HashMap[Double,Int] = _ + + // Access methods // + def min: Double = { +exception(DISCRETE) +_min + } + + def max: Double = { +exception(DISCRETE) +_max + } + + def mean: Double = { +exception(DISCRETE) +_mean + } + + def variance: Double = { +exception(DISCRETE) +_variance + } + + def categoryCounts: mutable.HashMap[Double,Int] = { +exception(CONTINUOUS) +_counts + } + + /** + * Returns the entropy value for this [[DISCRETE]] field. + */ + def entropy: Double = { +exception(CONTINUOUS) +val total: Double = _counts.valuesIterator.sum +3.322 * _counts.iterator.map(x = - (x._2 / total) * Math.log10(x._2 / total)).sum + } + + /** + * Returns the Gini impurity for this [[DISCRETE]] field. + */ + def gini: Double = { +exception(CONTINUOUS) +val total: Double = _counts.valuesIterator.sum +1 - _counts.iterator.map(x = (x._2 * x._2)).sum / (total * total) + } + + //-- Setter methods --// + private [statistics] def setContinuousParameters( + min: Double, + max: Double, + mean: Double, + variance: Double) +: Unit = { +exception(DISCRETE) +_min = min +_max = max +_mean = mean +_variance = variance + } + + private [statistics] def setDiscreteParameters(counts: mutable.HashMap[Double,Int]): Unit = { +exception(CONTINUOUS) +_counts = counts + } + + private def exception(checkFor: FieldType) = { +if(fieldType == checkFor){ + throw new RuntimeException(Invalid access of data. Check field types.) +} + } --- End diff -- Maybe If we check `FieldType` in each method (`min`, `max`, `mean`, ..., `gini`) with `Predef.assume` method, the user can understand what is the problem more properly. For example, when the user call `gini` method for continuous histogram, we can throw exception with detailed message such as Gini impurity for continuous histogram is not supported.. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37137819 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/FieldStats.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import org.apache.flink.ml.statistics.FieldType._ + +import scala.collection.mutable + +/** Class to represent Field statistics. + * + * =Parameters= + * -[[fieldType]]: + * Type of this field, [[DISCRETE]] or [[CONTINUOUS]] + * + * For [[DISCRETE]] fields, [[entropy]], [[gini]] and [[categoryCounts]] are provided. + * For [[CONTINUOUS]] fields, [[min]], [[max]], [[mean]] and [[variance]] are provided. + * + */ +class FieldStats(val fieldType: FieldType) extends Serializable { + // field parameters + private [statistics] var _min: Double = _ + private [statistics] var _max: Double = _ + private [statistics] var _mean: Double = _ + private [statistics] var _variance: Double = _ + private [statistics] var _counts: mutable.HashMap[Double,Int] = _ + + // Access methods // + def min: Double = { +exception(DISCRETE) +_min + } + + def max: Double = { +exception(DISCRETE) +_max + } + + def mean: Double = { +exception(DISCRETE) +_mean + } + + def variance: Double = { +exception(DISCRETE) +_variance + } + + def categoryCounts: mutable.HashMap[Double,Int] = { +exception(CONTINUOUS) +_counts + } + + /** + * Returns the entropy value for this [[DISCRETE]] field. + */ + def entropy: Double = { +exception(CONTINUOUS) +val total: Double = _counts.valuesIterator.sum +3.322 * _counts.iterator.map(x = - (x._2 / total) * Math.log10(x._2 / total)).sum + } + + /** + * Returns the Gini impurity for this [[DISCRETE]] field. + */ + def gini: Double = { +exception(CONTINUOUS) +val total: Double = _counts.valuesIterator.sum +1 - _counts.iterator.map(x = (x._2 * x._2)).sum / (total * total) + } + + //-- Setter methods --// + private [statistics] def setContinuousParameters( --- End diff -- Also `private[statistics]` would be better in here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37137821 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/FieldStats.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import org.apache.flink.ml.statistics.FieldType._ + +import scala.collection.mutable + +/** Class to represent Field statistics. + * + * =Parameters= + * -[[fieldType]]: + * Type of this field, [[DISCRETE]] or [[CONTINUOUS]] + * + * For [[DISCRETE]] fields, [[entropy]], [[gini]] and [[categoryCounts]] are provided. + * For [[CONTINUOUS]] fields, [[min]], [[max]], [[mean]] and [[variance]] are provided. + * + */ +class FieldStats(val fieldType: FieldType) extends Serializable { + // field parameters + private [statistics] var _min: Double = _ + private [statistics] var _max: Double = _ + private [statistics] var _mean: Double = _ + private [statistics] var _variance: Double = _ + private [statistics] var _counts: mutable.HashMap[Double,Int] = _ + + // Access methods // + def min: Double = { +exception(DISCRETE) +_min + } + + def max: Double = { +exception(DISCRETE) +_max + } + + def mean: Double = { +exception(DISCRETE) +_mean + } + + def variance: Double = { +exception(DISCRETE) +_variance + } + + def categoryCounts: mutable.HashMap[Double,Int] = { +exception(CONTINUOUS) +_counts + } + + /** + * Returns the entropy value for this [[DISCRETE]] field. + */ + def entropy: Double = { +exception(CONTINUOUS) +val total: Double = _counts.valuesIterator.sum +3.322 * _counts.iterator.map(x = - (x._2 / total) * Math.log10(x._2 / total)).sum + } + + /** + * Returns the Gini impurity for this [[DISCRETE]] field. + */ + def gini: Double = { +exception(CONTINUOUS) +val total: Double = _counts.valuesIterator.sum +1 - _counts.iterator.map(x = (x._2 * x._2)).sum / (total * total) + } + + //-- Setter methods --// + private [statistics] def setContinuousParameters( + min: Double, + max: Double, + mean: Double, + variance: Double) +: Unit = { +exception(DISCRETE) +_min = min +_max = max +_mean = mean +_variance = variance + } + + private [statistics] def setDiscreteParameters(counts: mutable.HashMap[Double,Int]): Unit = { --- End diff -- Also `private[statistics]` would be better in here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37137856 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/ContinuousHistogram.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import scala.Double.MaxValue +import scala.collection.mutable + +/** Implementation of a continuous valued online histogram + * Adapted from Ben-Haim and Yom-Tov's Streaming Decision Tree Algorithm + * Refer http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf + * + * =Parameters= + * -[[capacity]]: + * Number of bins to be used in the histogram + * + * -[[min]]: + * Lower limit on the elements + * + * -[[max]]: + * Upper limit on the elements + */ +case class ContinuousHistogram(capacity: Int, min: Double, max: Double) extends OnlineHistogram { + + require(capacity 0, Capacity should be a positive integer) + require(lower upper, Lower must be less than upper) + + val data = new mutable.ArrayBuffer[(Double, Int)]() + + /** Adds a new item to the histogram +* +* @param p value to be added +*/ + override def add(p: Double): Unit = { +require(p lower p upper, p + not in ( + lower + , + upper + )) +// search for the index where the value is just higher than p +val search = find(p) +// add the new value there, shifting everything to the right +data.insert(search, (p, 1)) +// If we're over capacity or any two elements are within 1e-9 of each other, merge. +// This will take care of the case if p was actually equal to some value in the histogram and +// just increment the value there +mergeElements() + } + + /** Merges the histogram with h and returns a histogram with capacity B +* +* @param h histogram to be merged +* @param B capacity of the resultant histogram +* @return Merged histogram with capacity B +*/ + override def merge(h: OnlineHistogram, B: Int): ContinuousHistogram = { +h match { + case temp: ContinuousHistogram = { +val m: Int = bins +val n: Int = temp.bins +var i, j: Int = 0 +val mergeList = new mutable.ArrayBuffer[(Double, Int)]() +while (i m || j n) { + if (i = m) { +mergeList += ((temp.getValue(j), temp.getCounter(j))) +j = j + 1 + } else if (j = n || getValue(i) = temp.getValue(j)) { +mergeList += (data.apply(i)) +i = i + 1 + } else { +mergeList += ((temp.getValue(j), temp.getCounter(j))) +j = j + 1 + } +} +// the size will be brought to capacity while constructing the new histogram itself +val finalLower = Math.min(lower, temp.lower) +val finalUpper = Math.max(upper, temp.upper) +val ret = new ContinuousHistogram(B, finalLower, finalUpper) +ret.loadData(mergeList.toArray) +ret + } + case default = +throw new RuntimeException(Only a continuous histogram is allowed to be merged with a + + continuous histogram) + +} + } + + /** Returns the qth quantile of the histogram +* +* @param q Quantile value in (0,1) +* @return Value at quantile q +*/ + def quantile(q: Double): Double = { +require(bins 0, Histogram is empty) +require(q 0 q 1, Quantile must be between 0 and 1) +var total = 0 +for (i - 0 to bins - 1) { + total = total + getCounter(i) +} +val wantedSum = (q * total).round.toInt +var currSum = count(getValue(0)) + +if (wantedSum currSum) { + require(lower -MaxValue, Set a lower bound before
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37137866 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/FieldStats.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import org.apache.flink.ml.statistics.FieldType._ + +import scala.collection.mutable + +/** Class to represent Field statistics. + * + * =Parameters= + * -[[fieldType]]: + * Type of this field, [[DISCRETE]] or [[CONTINUOUS]] + * + * For [[DISCRETE]] fields, [[entropy]], [[gini]] and [[categoryCounts]] are provided. + * For [[CONTINUOUS]] fields, [[min]], [[max]], [[mean]] and [[variance]] are provided. + * + */ +class FieldStats(val fieldType: FieldType) extends Serializable { + // field parameters + private [statistics] var _min: Double = _ + private [statistics] var _max: Double = _ + private [statistics] var _mean: Double = _ + private [statistics] var _variance: Double = _ + private [statistics] var _counts: mutable.HashMap[Double,Int] = _ + + // Access methods // + def min: Double = { +exception(DISCRETE) +_min + } + + def max: Double = { +exception(DISCRETE) +_max + } + + def mean: Double = { +exception(DISCRETE) +_mean + } + + def variance: Double = { +exception(DISCRETE) +_variance + } + + def categoryCounts: mutable.HashMap[Double,Int] = { +exception(CONTINUOUS) +_counts + } + + /** + * Returns the entropy value for this [[DISCRETE]] field. + */ + def entropy: Double = { +exception(CONTINUOUS) +val total: Double = _counts.valuesIterator.sum +3.322 * _counts.iterator.map(x = - (x._2 / total) * Math.log10(x._2 / total)).sum + } + + /** + * Returns the Gini impurity for this [[DISCRETE]] field. + */ + def gini: Double = { +exception(CONTINUOUS) +val total: Double = _counts.valuesIterator.sum +1 - _counts.iterator.map(x = (x._2 * x._2)).sum / (total * total) --- End diff -- Unnecessary parentheses in `(x._2 * x._2)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37137965 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/FieldStats.scala --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import org.apache.flink.ml.statistics.FieldType._ + +import scala.collection.mutable + +/** Class to represent Field statistics. + * + * =Parameters= + * -[[fieldType]]: + * Type of this field, [[DISCRETE]] or [[CONTINUOUS]] + * + * For [[DISCRETE]] fields, [[entropy]], [[gini]] and [[categoryCounts]] are provided. + * For [[CONTINUOUS]] fields, [[min]], [[max]], [[mean]] and [[variance]] are provided. + * + */ +class FieldStats(val fieldType: FieldType) extends Serializable { + // field parameters + private [statistics] var _min: Double = _ + private [statistics] var _max: Double = _ + private [statistics] var _mean: Double = _ + private [statistics] var _variance: Double = _ + private [statistics] var _counts: mutable.HashMap[Double,Int] = _ + + // Access methods // + def min: Double = { +exception(DISCRETE) +_min + } + + def max: Double = { +exception(DISCRETE) +_max + } + + def mean: Double = { +exception(DISCRETE) +_mean + } + + def variance: Double = { +exception(DISCRETE) +_variance + } + + def categoryCounts: mutable.HashMap[Double,Int] = { +exception(CONTINUOUS) +_counts + } + + /** + * Returns the entropy value for this [[DISCRETE]] field. + */ + def entropy: Double = { +exception(CONTINUOUS) +val total: Double = _counts.valuesIterator.sum +3.322 * _counts.iterator.map(x = - (x._2 / total) * Math.log10(x._2 / total)).sum --- End diff -- Is 3.322 used for changing base of log from 10 to 2? Why don't you use `Math.log(~~~) / Math.log(2)`? Saving `Math.log(2)` into an immutable class member is good for performance. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37138026 --- Diff: flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/ContinuousHistogram.scala --- @@ -0,0 +1,343 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.ml.statistics + +import scala.Double.MaxValue +import scala.collection.mutable + +/** Implementation of a continuous valued online histogram + * Adapted from Ben-Haim and Yom-Tov's Streaming Decision Tree Algorithm + * Refer http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf + * + * =Parameters= + * -[[capacity]]: + * Number of bins to be used in the histogram + * + * -[[min]]: + * Lower limit on the elements + * + * -[[max]]: + * Upper limit on the elements + */ +case class ContinuousHistogram(capacity: Int, min: Double, max: Double) extends OnlineHistogram { + + require(capacity 0, Capacity should be a positive integer) + require(lower upper, Lower must be less than upper) + + val data = new mutable.ArrayBuffer[(Double, Int)]() + + /** Adds a new item to the histogram +* +* @param p value to be added +*/ + override def add(p: Double): Unit = { +require(p lower p upper, p + not in ( + lower + , + upper + )) +// search for the index where the value is just higher than p +val search = find(p) +// add the new value there, shifting everything to the right +data.insert(search, (p, 1)) +// If we're over capacity or any two elements are within 1e-9 of each other, merge. +// This will take care of the case if p was actually equal to some value in the histogram and +// just increment the value there +mergeElements() + } + + /** Merges the histogram with h and returns a histogram with capacity B +* +* @param h histogram to be merged +* @param B capacity of the resultant histogram +* @return Merged histogram with capacity B +*/ + override def merge(h: OnlineHistogram, B: Int): ContinuousHistogram = { +h match { + case temp: ContinuousHistogram = { +val m: Int = bins +val n: Int = temp.bins +var i, j: Int = 0 +val mergeList = new mutable.ArrayBuffer[(Double, Int)]() +while (i m || j n) { + if (i = m) { +mergeList += ((temp.getValue(j), temp.getCounter(j))) +j = j + 1 + } else if (j = n || getValue(i) = temp.getValue(j)) { +mergeList += (data.apply(i)) --- End diff -- Unnecessary parenthesis --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-2512) Add client.close() before throw RuntimeException
[ https://issues.apache.org/jira/browse/FLINK-2512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-2512. - Resolution: Fixed Fix Version/s: 0.10 Fixed via c2b1eb7961ed16b8985d7f8c02e87c0964b818e8 Add client.close() before throw RuntimeException Key: FLINK-2512 URL: https://issues.apache.org/jira/browse/FLINK-2512 Project: Flink Issue Type: Bug Components: flink-contrib Affects Versions: 0.8.1 Reporter: fangfengbin Assignee: fangfengbin Priority: Minor Fix For: 0.10 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2512) Add client.close() before throw RuntimeException
[ https://issues.apache.org/jira/browse/FLINK-2512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-2512. --- Thank you for the contribution Add client.close() before throw RuntimeException Key: FLINK-2512 URL: https://issues.apache.org/jira/browse/FLINK-2512 Project: Flink Issue Type: Bug Components: flink-contrib Affects Versions: 0.8.1 Reporter: fangfengbin Assignee: fangfengbin Priority: Minor Fix For: 0.10 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2512]Add client.close() before throw Ru...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/1009 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37138063 --- Diff: docs/libs/ml/statistics.md --- @@ -0,0 +1,108 @@ +--- +mathjax: include +htmlTitle: FlinkML - Statistics +title: a href=../mlFlinkML/a - Statistics +--- +!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +License); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +-- + +* This will be replaced by the TOC +{:toc} + +## Description + + The statistics utility provides features such as building histograms over data, determining + mean, variance, gini impurity, entropy etc. of data. + +## Methods + + The Statistics utility provides two major functions: `createHistogram` and `dataStats`. + +### Creating a histogram + + There are two types of histograms: + ul + li + strongContinuous Histograms/strong: These histograms are formed on a data set `X: DataSet[Double]` + when the values in `X` are from a continuous range. These histograms support + `quantile` and `sum` operations. Here `quantile(q)` refers to a value $x_q$ such that $|x: x + \leq x_q| = q * |X|$. Further, `sum(s)` refers to the number of elements $x \leq s$, which can +be construed as a cumulative probability value at $s$[Of course, iscaled/i probability]. + br +A continuous histogram can be formed by calling `X.createHistogram(b)` where `b` is the +number of bins. + /li + li +strongCategorical Histograms/strong: These histograms are formed on a data set `X:DataSet[Double]` +when the values in `X` are from a discrete distribution. These histograms +support `count(c)` operation which returns the number of elements associated with cateogry `c`. +br +A categorical histogram can be formed by calling `X.createHistogram(0)`. + /li + /ul + +### Data Statistics + + The `dataStats` function operates on a data set `X: DataSet[Vector]` and returns column-wise + statistics for `X`. Every field of `X` is allowed to be defined as either idiscrete/i or + icontinuous/i. + br + Statistics can be evaluated by calling `DataStats.dataStats(X)` or + `DataStats.dataStats(X, discreteFields`). The latter is used when some fields are needed to be + declared discrete-valued, and is provided as an array of indices of fields which are discrete. + br + The following information is available as part of `DataStats`: + ul +liNumber of elements in `X`/li +liDimension of `X`/li +liColumn-wise statistics where for discrete fields, we report counts for each category, and + the Gini impurity and Entropy of the field, while for continuous fields, we report the + minimum, maximum, mean and variance. +/li + /ul + +## Examples + +{% highlight scala %} + +import org.apache.flink.ml.statistics._ +import org.apache.flink.ml._ + +val X: DataSet[Double] = ... +// Create continuous histogram +val histogram = X.createHistogram(5) // creates a histogram with five bins +histogram.quantile(0.3) // returns the 30th quantile +histogram.sum(4) // returns number of elements less than 4 + +// Create categorical histogram +val histogram = X.createHistogram(0) // creates a categorical histogram +histogram.count(3) // number of elements with cateogory value 3 --- End diff -- cateogory - category --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on a diff in the pull request: https://github.com/apache/flink/pull/861#discussion_r37138093 --- Diff: docs/libs/ml/statistics.md --- @@ -0,0 +1,108 @@ +--- +mathjax: include +htmlTitle: FlinkML - Statistics +title: a href=../mlFlinkML/a - Statistics +--- +!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +License); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +-- + +* This will be replaced by the TOC +{:toc} + +## Description + + The statistics utility provides features such as building histograms over data, determining + mean, variance, gini impurity, entropy etc. of data. + +## Methods + + The Statistics utility provides two major functions: `createHistogram` and `dataStats`. + +### Creating a histogram + + There are two types of histograms: + ul + li + strongContinuous Histograms/strong: These histograms are formed on a data set `X: DataSet[Double]` + when the values in `X` are from a continuous range. These histograms support + `quantile` and `sum` operations. Here `quantile(q)` refers to a value $x_q$ such that $|x: x + \leq x_q| = q * |X|$. Further, `sum(s)` refers to the number of elements $x \leq s$, which can +be construed as a cumulative probability value at $s$[Of course, iscaled/i probability]. + br +A continuous histogram can be formed by calling `X.createHistogram(b)` where `b` is the +number of bins. + /li + li +strongCategorical Histograms/strong: These histograms are formed on a data set `X:DataSet[Double]` +when the values in `X` are from a discrete distribution. These histograms +support `count(c)` operation which returns the number of elements associated with cateogry `c`. +br +A categorical histogram can be formed by calling `X.createHistogram(0)`. + /li + /ul + +### Data Statistics + + The `dataStats` function operates on a data set `X: DataSet[Vector]` and returns column-wise + statistics for `X`. Every field of `X` is allowed to be defined as either idiscrete/i or + icontinuous/i. + br + Statistics can be evaluated by calling `DataStats.dataStats(X)` or + `DataStats.dataStats(X, discreteFields`). The latter is used when some fields are needed to be + declared discrete-valued, and is provided as an array of indices of fields which are discrete. + br + The following information is available as part of `DataStats`: + ul +liNumber of elements in `X`/li +liDimension of `X`/li +liColumn-wise statistics where for discrete fields, we report counts for each category, and + the Gini impurity and Entropy of the field, while for continuous fields, we report the + minimum, maximum, mean and variance. +/li + /ul + +## Examples + +{% highlight scala %} + +import org.apache.flink.ml.statistics._ +import org.apache.flink.ml._ + +val X: DataSet[Double] = ... +// Create continuous histogram +val histogram = X.createHistogram(5) // creates a histogram with five bins +histogram.quantile(0.3) // returns the 30th quantile +histogram.sum(4) // returns number of elements less than 4 --- End diff -- Is this example valid? `RichDoubleDataSet.createHistogram` returns `DataSet[OnlineHistogram]` and it doesn't have methods such as `quantile`, `sum`, ..., etc.. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set
[ https://issues.apache.org/jira/browse/FLINK-2527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698399#comment-14698399 ] Vasia Kalavri commented on FLINK-2527: -- I think (3) would break the model semantics. I'm leaning towards (1). [~ggevay] do you have any case in mind that (2) would allow to implement but (1) wouldn't? If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set - Key: FLINK-2527 URL: https://issues.apache.org/jira/browse/FLINK-2527 Project: Flink Issue Type: Bug Components: Gelly Reporter: Gabor Gevay Assignee: Gabor Gevay Fix For: 0.10, 0.9.1 The problem is that if setNewVertexValue is called more than once, it sends each new value to the out Collector, and these all end up in the workset, but then the coGroups in the two descendants of MessagingUdfWithEdgeValues use only the first value in the state Iterable. I see three ways to resolve this: 1. Add it to the documentation that setNewVertexValue should only be called once, and optionally add a check for this. 2. In setNewVertexValue, do not send the newValue to the out Collector at once, but only record it in outVal, and send the last recorded value after updateVertex returns. 3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup and MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need some documentation addition.) I like 2. the best. What are your opinions? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...
Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/861#issuecomment-131425060 Hi @sachingoel0101, Thanks for your contribution. I reviewed this PR and commented the source code. There are some problems which aren't commented. In documentation, there are many lines including `a`, `ul`, `li`, 'br', 'strong' and 'i' tag. These can be replaced with markdown syntax. And some codes (`MLUtils.createHistogram`, `ColumnStatistics`) are not formatted. I didn't finish reviewing process of building histogram. After reading full source code and the given paper, I'll add comment about the process. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request: [FLINK-2526]Add try-catch for task when it sto...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1020#issuecomment-131390701 The pull request #1017 actually fixes this as well, together with a lot of other things (similar problems are in many places). Can you have a look at #1017 and see if that would solve your concern? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2526) Add catch{} for task when it stop running
[ https://issues.apache.org/jira/browse/FLINK-2526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698306#comment-14698306 ] ASF GitHub Bot commented on FLINK-2526: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1020#issuecomment-131390701 The pull request #1017 actually fixes this as well, together with a lot of other things (similar problems are in many places). Can you have a look at #1017 and see if that would solve your concern? Add catch{} for task when it stop running -- Key: FLINK-2526 URL: https://issues.apache.org/jira/browse/FLINK-2526 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.8.1 Reporter: fangfengbin Assignee: fangfengbin Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-2486) Remove unwanted null check in removeInstance function
[ https://issues.apache.org/jira/browse/FLINK-2486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-2486. - Resolution: Won't Fix Valid sanity check, won't fix. Remove unwanted null check in removeInstance function - Key: FLINK-2486 URL: https://issues.apache.org/jira/browse/FLINK-2486 Project: Flink Issue Type: Bug Components: Scheduler Affects Versions: 0.8.1 Reporter: fangfengbin Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2486) Remove unwanted null check in removeInstance function
[ https://issues.apache.org/jira/browse/FLINK-2486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-2486. --- Remove unwanted null check in removeInstance function - Key: FLINK-2486 URL: https://issues.apache.org/jira/browse/FLINK-2486 Project: Flink Issue Type: Bug Components: Scheduler Affects Versions: 0.8.1 Reporter: fangfengbin Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (FLINK-2516) Remove unwanted log.isInfoEnabled check
[ https://issues.apache.org/jira/browse/FLINK-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-2516. - Resolution: Won't Fix As per discussion in the issue, this will not be changed. Remove unwanted log.isInfoEnabled check --- Key: FLINK-2516 URL: https://issues.apache.org/jira/browse/FLINK-2516 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 0.8.1 Reporter: fangfengbin Assignee: fangfengbin Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Closed] (FLINK-2516) Remove unwanted log.isInfoEnabled check
[ https://issues.apache.org/jira/browse/FLINK-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen closed FLINK-2516. --- Remove unwanted log.isInfoEnabled check --- Key: FLINK-2516 URL: https://issues.apache.org/jira/browse/FLINK-2516 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: 0.8.1 Reporter: fangfengbin Assignee: fangfengbin Priority: Minor -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2462) Wrong exception reporting in streaming jobs
[ https://issues.apache.org/jira/browse/FLINK-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698413#comment-14698413 ] ASF GitHub Bot commented on FLINK-2462: --- Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1017#issuecomment-131431977 Fixed the issues with the tests. Builds locally, waiting for Travis to confirm. Wrong exception reporting in streaming jobs --- Key: FLINK-2462 URL: https://issues.apache.org/jira/browse/FLINK-2462 Project: Flink Issue Type: Bug Components: Streaming Affects Versions: 0.10 Reporter: Stephan Ewen Assignee: Stephan Ewen Priority: Blocker Fix For: 0.10 When streaming tasks are fail and are canceled, they report a plethora of followup exceptions. The batch operators have a clear model that makes sure that root causes are reported, and followup exceptions are not reported. That makes debugging much easier. A big part of that is to have a single consistent place that logs exceptions, and that has a view of whether the operation is still running, or whether it has been canceled. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...
Github user StephanEwen commented on the pull request: https://github.com/apache/flink/pull/1017#issuecomment-131431977 Fixed the issues with the tests. Builds locally, waiting for Travis to confirm. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set
[ https://issues.apache.org/jira/browse/FLINK-2527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698423#comment-14698423 ] Gabor Gevay commented on FLINK-2527: I'm also leaning towards (1) now. I have actually implemented (2) in the meantime, but then I realized that it sets a subtle trap for the user, that I immediately fell into :) In my user function, I have a loop over the msgs, and for each msg I decide to set some new vertex value or not. This loop might set a new value multiple times, and the last one should be retained. At first, I liked (2) better, because if we have (1), then I essentially have to implement (2) inside my user function anyway. I thought that this situation is probably a common one, so why have everyone reimplement (2) inside their user functions, if we can do it in Gelly? However, the trap is that my code inside the loop implicitly assumed that the setNewVertexValue function updates the vertex variable (the first parameter of the UDF), but it does not. Of course, we could make the setNewVertexValue do this update, but this is getting complicated. So it is probably just best to go with (1), to keep the API nice and simple. If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set - Key: FLINK-2527 URL: https://issues.apache.org/jira/browse/FLINK-2527 Project: Flink Issue Type: Bug Components: Gelly Reporter: Gabor Gevay Assignee: Gabor Gevay Fix For: 0.10, 0.9.1 The problem is that if setNewVertexValue is called more than once, it sends each new value to the out Collector, and these all end up in the workset, but then the coGroups in the two descendants of MessagingUdfWithEdgeValues use only the first value in the state Iterable. I see three ways to resolve this: 1. Add it to the documentation that setNewVertexValue should only be called once, and optionally add a check for this. 2. In setNewVertexValue, do not send the newValue to the out Collector at once, but only record it in outVal, and send the last recorded value after updateVertex returns. 3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup and MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need some documentation addition.) I like 2. the best. What are your opinions? -- This message was sent by Atlassian JIRA (v6.3.4#6332)