[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37141049
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/FieldStats.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import org.apache.flink.ml.statistics.FieldType._
+
+import scala.collection.mutable
+
+/** Class to represent Field statistics.
+  *
+  * =Parameters=
+  * -[[fieldType]]:
+  *   Type of this field, [[DISCRETE]] or [[CONTINUOUS]]
+  *
+  * For [[DISCRETE]] fields, [[entropy]], [[gini]] and [[categoryCounts]] 
are provided.
+  * For [[CONTINUOUS]] fields, [[min]], [[max]], [[mean]] and [[variance]] 
are provided.
+  *
+  */
+class FieldStats(val fieldType: FieldType) extends Serializable {
+  // field parameters
+  private [statistics] var _min: Double = _
+  private [statistics] var _max: Double = _
+  private [statistics] var _mean: Double = _
+  private [statistics] var _variance: Double = _
+  private [statistics] var _counts: mutable.HashMap[Double,Int] = _
+
+  // Access methods //
+  def min: Double = {
+exception(DISCRETE)
+_min
+  }
+
+  def max: Double = {
+exception(DISCRETE)
+_max
+  }
+
+  def mean: Double = {
+exception(DISCRETE)
+_mean
+  }
+
+  def variance: Double = {
+exception(DISCRETE)
+_variance
+  }
+
+  def categoryCounts: mutable.HashMap[Double,Int] = {
+exception(CONTINUOUS)
+_counts
+  }
+
+  /**
+   * Returns the entropy value for this [[DISCRETE]] field.
+   */
+  def entropy: Double = {
+exception(CONTINUOUS)
+val total: Double = _counts.valuesIterator.sum
+3.322 * _counts.iterator.map(x = - (x._2 / total) * Math.log10(x._2 / 
total)).sum
+  }
+
+  /**
+   * Returns the Gini impurity for this [[DISCRETE]] field.
+   */
+  def gini: Double = {
+exception(CONTINUOUS)
+val total: Double = _counts.valuesIterator.sum
+1 - _counts.iterator.map(x = (x._2 * x._2)).sum / (total * total)
+  }
+
+  //-- Setter methods --//
+  private [statistics] def setContinuousParameters(
--- End diff --

Gini impurity should be accessible to user even outside the package. It is 
a property of the field itself and we're not using it ourselves anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2477) Add test for SocketClientSink

2015-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698510#comment-14698510
 ] 

ASF GitHub Bot commented on FLINK-2477:
---

Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/977#issuecomment-131473233
  
Ok, I`ve done it.
Thank you!


 Add test for SocketClientSink
 -

 Key: FLINK-2477
 URL: https://issues.apache.org/jira/browse/FLINK-2477
 Project: Flink
  Issue Type: Test
  Components: Streaming
Affects Versions: 0.10
 Environment: win7 32bit;linux
Reporter: Huang Wei
Priority: Minor
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h

 Add some tests for SocketClientSink.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2477][Add]Add tests for SocketClientSin...

2015-08-15 Thread HuangWHWHW
Github user HuangWHWHW commented on the pull request:

https://github.com/apache/flink/pull/977#issuecomment-131473233
  
Ok, I`ve done it.
Thank you!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-131474758
  
Hi @chiwanpark, thanks for reviewing this. This was my first time working 
in Scala, so I hope you'll forgive the slight mistakes (oversights, perhaps?). 
I've tried to address most of your comments and left notes where I was not 
sure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2528) MatchTaskTest failure

2015-08-15 Thread Sachin Goel (JIRA)
Sachin Goel created FLINK-2528:
--

 Summary: MatchTaskTest failure
 Key: FLINK-2528
 URL: https://issues.apache.org/jira/browse/FLINK-2528
 Project: Flink
  Issue Type: Bug
Reporter: Sachin Goel


MatchTaskTest fails with a Null Pointer exception. Here's the log: 
https://s3.amazonaws.com/archive.travis-ci.org/jobs/75780253/log.txt
and the relevant parts of trace:
{code}
Exception in thread Thread-154 java.lang.AssertionError: Canceling task 
failed: java.lang.NullPointerException
at 
org.apache.flink.runtime.operators.testutils.DriverTestBase.cancel(DriverTestBase.java:271)
at 
org.apache.flink.runtime.operators.testutils.TaskCancelThread.run(TaskCancelThread.java:60)

at org.junit.Assert.fail(Assert.java:88)
at 
org.apache.flink.runtime.operators.testutils.TaskCancelThread.run(TaskCancelThread.java:68)
{code}

{code}
Thread-153 prio=10 tid=0x7fc1e1338800 nid=0x5cd6 waiting on condition 
[0x7fc1d2b1]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
at java.lang.Thread.sleep(Native Method)
at 
org.apache.flink.runtime.operators.MatchTaskTest$MockDelayingMatchStub.join(MatchTaskTest.java:984)
at 
org.apache.flink.runtime.operators.MatchTaskTest$MockDelayingMatchStub.join(MatchTaskTest.java:978)
at 
org.apache.flink.runtime.operators.sort.AbstractMergeIterator.crossMwithNValues(AbstractMergeIterator.java:297)
at 
org.apache.flink.runtime.operators.sort.AbstractMergeIterator.crossMatchingGroup(AbstractMergeIterator.java:146)
at 
org.apache.flink.runtime.operators.sort.AbstractMergeInnerJoinIterator.callWithNextKey(AbstractMergeInnerJoinIterator.java:105)
at 
org.apache.flink.runtime.operators.JoinDriver.run(JoinDriver.java:208)
at 
org.apache.flink.runtime.operators.testutils.DriverTestBase.testDriverInternal(DriverTestBase.java:208)
at 
org.apache.flink.runtime.operators.testutils.DriverTestBase.testDriver(DriverTestBase.java:174)
at 
org.apache.flink.runtime.operators.MatchTaskTest$3.run(MatchTaskTest.java:520)
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2529) fix on some unused code for flink-runtime

2015-08-15 Thread Huang Wei (JIRA)
Huang Wei created FLINK-2529:


 Summary: fix on some unused code for flink-runtime
 Key: FLINK-2529
 URL: https://issues.apache.org/jira/browse/FLINK-2529
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 0.10
Reporter: Huang Wei
Priority: Minor
 Fix For: 0.10






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37141028
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/FieldStats.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import org.apache.flink.ml.statistics.FieldType._
+
+import scala.collection.mutable
+
+/** Class to represent Field statistics.
+  *
+  * =Parameters=
+  * -[[fieldType]]:
+  *   Type of this field, [[DISCRETE]] or [[CONTINUOUS]]
+  *
+  * For [[DISCRETE]] fields, [[entropy]], [[gini]] and [[categoryCounts]] 
are provided.
+  * For [[CONTINUOUS]] fields, [[min]], [[max]], [[mean]] and [[variance]] 
are provided.
+  *
+  */
+class FieldStats(val fieldType: FieldType) extends Serializable {
+  // field parameters
+  private [statistics] var _min: Double = _
+  private [statistics] var _max: Double = _
+  private [statistics] var _mean: Double = _
+  private [statistics] var _variance: Double = _
+  private [statistics] var _counts: mutable.HashMap[Double,Int] = _
+
--- End diff --

Could you clarify this point? They're already package private fields.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread sachingoel0101
Github user sachingoel0101 commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37141076
  
--- Diff: docs/libs/ml/statistics.md ---
@@ -0,0 +1,108 @@
+---
+mathjax: include
+htmlTitle: FlinkML - Statistics
+title: a href=../mlFlinkML/a - Statistics
+---
+!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+License); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+
+ The statistics utility provides features such as building histograms over 
data, determining
+ mean, variance, gini impurity, entropy etc. of data.
+
+## Methods
+
+ The Statistics utility provides two major functions: `createHistogram` 
and `dataStats`.
+
+### Creating a histogram
+
+ There are two types of histograms:
+ ul
+  li
+   strongContinuous Histograms/strong: These histograms are formed on 
a data set `X: DataSet[Double]` 
+   when the values in `X` are from a continuous range. These histograms 
support
+   `quantile` and `sum`  operations. Here `quantile(q)` refers to a value 
$x_q$ such that $|x: x
+   \leq x_q| = q * |X|$. Further, `sum(s)` refers to the number of 
elements $x \leq s$, which can
+be construed as a cumulative probability value at $s$[Of course, 
iscaled/i probability].
+   br
+A continuous histogram can be formed by calling `X.createHistogram(b)` 
where `b` is the
+number of bins.
+  /li
+  li
+strongCategorical Histograms/strong: These histograms are formed 
on a data set `X:DataSet[Double]` 
+when the values in `X` are from a discrete distribution. These 
histograms
+support `count(c)` operation which returns the number of elements 
associated with cateogry `c`.
+br
+A categorical histogram can be formed by calling 
`X.createHistogram(0)`.
+  /li
+ /ul
+
+### Data Statistics
+
+ The `dataStats` function operates on a data set `X: DataSet[Vector]` and 
returns column-wise
+ statistics for `X`. Every field of `X` is allowed to be defined as either 
idiscrete/i or
+ icontinuous/i.
+ br
+ Statistics can be evaluated by calling `DataStats.dataStats(X)` or 
+ `DataStats.dataStats(X, discreteFields`). The latter is used when some 
fields are needed to be 
+ declared discrete-valued, and is provided as an array of indices of 
fields which are discrete.
+ br
+ The following information is available as part of `DataStats`:
+ ul
+liNumber of elements in `X`/li
+liDimension of `X`/li
+liColumn-wise statistics where for discrete fields, we report counts 
for each category, and
+ the Gini impurity and Entropy of the field, while for continuous 
fields, we report the
+ minimum, maximum, mean and variance.
+/li
+ /ul
+
+## Examples
+
+{% highlight scala %}
+
+import org.apache.flink.ml.statistics._
+import org.apache.flink.ml._
+
+val X: DataSet[Double] = ...
+// Create continuous histogram
+val histogram = X.createHistogram(5) // creates a histogram with five 
bins
+histogram.quantile(0.3)  // returns the 30th quantile
+histogram.sum(4) // returns number of elements 
less than 4
--- End diff --

You're right. I'm wondering if we should provide two methods for this 
purpose. Otherwise user will need to cast into the appropriate class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

2015-08-15 Thread sachingoel0101
Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/1003#issuecomment-131477326
  
A stand-alone parameter server service will require setting up and tearing 
down of the client every time the user, say, opens and closes a Rich function 
while using it. Further, it means we have to add another dependency when the 
same could be accomplished using akka.
In this implementation, the parameter *server* is at every task manager 
[effectively it acts as a client to serve all running tasks at one node. In 
fact, in this sense, there are no servers, just clients at every worker, which 
are managed by the Job Manager]. This in itself means lesser data transfer over 
the network, since every *server* will usually be owner of a key and can serve 
its *clients* faster instead of every request going over the network.
Further, it is completely distributed, and every task manager maintains its 
own *server* and sets it up or tears it down along-with itself.
As far as including it in the core itself is concerned, there isn't much of 
it. There are the 3-4 odd functions directly added in the Runtime context, 
which effectively serve as an interface.
@tillrohrmann, could you weigh in here if this is the intended use of a PS 
in ML algorithms. I can easily see this working with, for example, the 
regression algorithm.
The reasons I included it into the runtime is that, there will be no 
chances of failure now. If the TaskManager is alive, the Parameter Server at 
that client will be alive. Further, the Job Manager manages the servers and 
determines where each key will go [which will be crucial to recovery], 
something which can be very hard to determine in a completely de-centralized 
manner (I couldn't think of a full-proof way). This ensures that the server is 
running only on the workers where it's needed, and if it is needed. Keeping the 
Job Manager in the loop also ensures that recovery is easy. If a Task Manager 
fails, the Job Manager knows which server failed by matching the `InstanceID`s 
and can kick off the recovery process from the duplicate server. [This is not 
implemented yet.]
A stand-alone PS will add another master-node system in parallel to the 
JobManager-TaskManager system, which can be efficiently used for this purpose. 
Of course, this doesn't matter if we use an external key-value store.
I will have a look at #967 and see how the two can be integrated.

I had a look at an open implementation done for Spark. 
https://github.com/apache/spark/compare/branch-1.3...chouqin:ps-on-spark-1.3
This adds a separate context and a function on RDD to access the PS and 
does require running a service inside the core environment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2529) fix on some unused code for flink-runtime

2015-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2529?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698529#comment-14698529
 ] 

ASF GitHub Bot commented on FLINK-2529:
---

GitHub user HuangWHWHW opened a pull request:

https://github.com/apache/flink/pull/1022

[FLINK-2529][runtime]remove some unused code

There are some reviews:
1.var consumerGraph is never used in public Boolean call() throws Exception.
2.In my learned knowledge, function Thread.currentThread() will never 
return null.So the code shutdownHook != null is unwanted.
And i`m not complete sure.Maybe I`m wrong.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HuangWHWHW/flink FLINK-2529

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1022.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1022


commit f630fe9cf28ac734a472134d907e635693f00ad0
Author: HuangWHWHW 404823...@qq.com
Date:   2015-08-16T03:56:18Z

[FLINK-2529][runtime]remove some unused code




 fix on some unused code for flink-runtime
 -

 Key: FLINK-2529
 URL: https://issues.apache.org/jira/browse/FLINK-2529
 Project: Flink
  Issue Type: Improvement
  Components: Local Runtime
Affects Versions: 0.10
Reporter: Huang Wei
Priority: Minor
 Fix For: 0.10

   Original Estimate: 168h
  Remaining Estimate: 168h





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2529][runtime]remove some unused code

2015-08-15 Thread HuangWHWHW
GitHub user HuangWHWHW opened a pull request:

https://github.com/apache/flink/pull/1022

[FLINK-2529][runtime]remove some unused code

There are some reviews:
1.var consumerGraph is never used in public Boolean call() throws Exception.
2.In my learned knowledge, function Thread.currentThread() will never 
return null.So the code shutdownHook != null is unwanted.
And i`m not complete sure.Maybe I`m wrong.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HuangWHWHW/flink FLINK-2529

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1022.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1022


commit f630fe9cf28ac734a472134d907e635693f00ad0
Author: HuangWHWHW 404823...@qq.com
Date:   2015-08-16T03:56:18Z

[FLINK-2529][runtime]remove some unused code




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2478]fix the array may have out of boun...

2015-08-15 Thread Rucongzhang
GitHub user Rucongzhang opened a pull request:

https://github.com/apache/flink/pull/1021

[FLINK-2478]fix the array may have out of bounds

In getNestedDelta function, the array length of oldDatapoint and the array 
length of newDatapoint  are notbeen ensured same.So add the judgement.And I 
fixed the problem. Please put this problem to master branch. Thank you.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Rucongzhang/flink FLINK-2487

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1021.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1021


commit e68c86f4ce8ae2d1e2bac8ab432fc93b5f52f756
Author: Rucongzhang zhangruc...@huawei.com
Date:   2015-08-15T09:25:25Z

[FLINK-2478]fix the array may have out of bounds




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2478) The page “FlinkML - Machine Learning for Flink“ https://ci.apache.org/projects/flink/flink-docs-master/libs/ml/ contains a dead link

2015-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698207#comment-14698207
 ] 

ASF GitHub Bot commented on FLINK-2478:
---

GitHub user Rucongzhang opened a pull request:

https://github.com/apache/flink/pull/1021

[FLINK-2478]fix the array may have out of bounds

In getNestedDelta function, the array length of oldDatapoint and the array 
length of newDatapoint  are notbeen ensured same.So add the judgement.And I 
fixed the problem. Please put this problem to master branch. Thank you.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Rucongzhang/flink FLINK-2487

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1021.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1021


commit e68c86f4ce8ae2d1e2bac8ab432fc93b5f52f756
Author: Rucongzhang zhangruc...@huawei.com
Date:   2015-08-15T09:25:25Z

[FLINK-2478]fix the array may have out of bounds




 The page “FlinkML - Machine Learning for Flink“  
 https://ci.apache.org/projects/flink/flink-docs-master/libs/ml/ contains a 
 dead link
 -

 Key: FLINK-2478
 URL: https://issues.apache.org/jira/browse/FLINK-2478
 Project: Flink
  Issue Type: Task
  Components: Documentation
Affects Versions: 0.10
Reporter: Slim Baltagi
Assignee: Till Rohrmann
Priority: Minor

 Note that FlinkML is currently not part of the binary distribution. See 
 linking with it for cluster execution here.
 'here' links to a dead link: 
 https://ci.apache.org/projects/flink/flink-docs-master/libs/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution
 The correct link is: 
 https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set

2015-08-15 Thread Gabor Gevay (JIRA)
Gabor Gevay created FLINK-2527:
--

 Summary: If a VertexUpdateFunction calls setNewVertexValue more 
than once, the MessagingFunction will only see the first value set
 Key: FLINK-2527
 URL: https://issues.apache.org/jira/browse/FLINK-2527
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Reporter: Gabor Gevay
Assignee: Gabor Gevay
 Fix For: 0.10, 0.9.1


The problem is that if setNewVertexValue is called more than once, it sends 
each new value to the out Collector, and these all end up in the workset, but 
then the coGroups in the two descendants of MessagingUdfWithEdgeValues use only 
the first value in the state Iterable. I see three ways to resolve this:

1. Add it to the documentation that setNewVertexValue should only be called 
once, and optionally add a check for this.
2. In setNewVertexValue, do not send the newValue to the out Collector at once, 
but only record it in outVal, and send the last recorded value after 
updateVertex returns.
3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup and 
MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need some 
documentation addition.)

I like 2. the best. What are your opinions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2526) Add catch{} for task when it stop running

2015-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698160#comment-14698160
 ] 

ASF GitHub Bot commented on FLINK-2526:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/1020#discussion_r37134398
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
 ---
@@ -100,9 +100,27 @@ public void invoke() throws Exception {
this.isRunning = false;
// Cleanup
inputProcessor.clearBuffers();
-   inputProcessor.cleanup();
-   outputHandler.flushOutputs();
-   clearBuffers();
+
+   try {
+   inputProcessor.cleanup();
+   }
+   catch (Exception e) {
+   LOG.warn(Clean up input processor failed.);
--- End diff --

the exception message shouldn't be discarded.


 Add catch{} for task when it stop running 
 --

 Key: FLINK-2526
 URL: https://issues.apache.org/jira/browse/FLINK-2526
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2526) Add catch{} for task when it stop running

2015-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698179#comment-14698179
 ] 

ASF GitHub Bot commented on FLINK-2526:
---

Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1020#issuecomment-131314931
  
@zentol Hello! Thank you for your suggestion.I have add exception message 
in log.


 Add catch{} for task when it stop running 
 --

 Key: FLINK-2526
 URL: https://issues.apache.org/jira/browse/FLINK-2526
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2526]Add try-catch for task when it sto...

2015-08-15 Thread ffbin
Github user ffbin commented on the pull request:

https://github.com/apache/flink/pull/1020#issuecomment-131314931
  
@zentol Hello! Thank you for your suggestion.I have add exception message 
in log.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Created] (FLINK-2526) Add catch{} for task when it stop running

2015-08-15 Thread fangfengbin (JIRA)
fangfengbin created FLINK-2526:
--

 Summary: Add catch{} for task when it stop running 
 Key: FLINK-2526
 URL: https://issues.apache.org/jira/browse/FLINK-2526
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: Some updates for programming_guide.md

2015-08-15 Thread chenliang613
GitHub user chenliang613 opened a pull request:

https://github.com/apache/flink/pull/1019

Some updates for programming_guide.md

Some updates for the section Program Skeleton of programming_guide.md

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/chenliang613/flink patch-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1019.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1019


commit a0b17ead729511a856609a6ac80033d30f1dce71
Author: CHEN LIANG chenliang...@sina.cn
Date:   2015-08-15T07:04:08Z

Some updates for programming_guide.md

Some updates for the section Program Skeleton of programming_guide.md




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2526]Add try-catch for task when it sto...

2015-08-15 Thread ffbin
GitHub user ffbin opened a pull request:

https://github.com/apache/flink/pull/1020

[FLINK-2526]Add try-catch for task when it stop running

inputProcessor cleanup() may throw IOException.If do not catch it, the next 
line outputHandler .flushOutputs() will not work and cause output data loss.So 
i think add some try-catch is necessary.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ffbin/flink FLINK-2526

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1020.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1020


commit f846f56198c921f9285bce4fb25392d9d6e2a827
Author: ffbin 869218...@qq.com
Date:   2015-08-15T06:40:16Z

[FLINK-2526]Add catch{} for task when it stop running




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2526) Add catch{} for task when it stop running

2015-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698153#comment-14698153
 ] 

ASF GitHub Bot commented on FLINK-2526:
---

GitHub user ffbin opened a pull request:

https://github.com/apache/flink/pull/1020

[FLINK-2526]Add try-catch for task when it stop running

inputProcessor cleanup() may throw IOException.If do not catch it, the next 
line outputHandler .flushOutputs() will not work and cause output data loss.So 
i think add some try-catch is necessary.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ffbin/flink FLINK-2526

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/1020.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1020


commit f846f56198c921f9285bce4fb25392d9d6e2a827
Author: ffbin 869218...@qq.com
Date:   2015-08-15T06:40:16Z

[FLINK-2526]Add catch{} for task when it stop running




 Add catch{} for task when it stop running 
 --

 Key: FLINK-2526
 URL: https://issues.apache.org/jira/browse/FLINK-2526
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2334) IOException: Channel to path could not be opened

2015-08-15 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698362#comment-14698362
 ] 

Stephan Ewen commented on FLINK-2334:
-

Is this problem actually reproducible, or was this a one time occurrence?

 IOException: Channel to path could not be opened
 

 Key: FLINK-2334
 URL: https://issues.apache.org/jira/browse/FLINK-2334
 Project: Flink
  Issue Type: Bug
Affects Versions: 0.9
 Environment: local and cluster environment; Linux and MacOS
Reporter: David Heller
Priority: Minor

 We've encountered an IOException due to missing temporary files (see 
 stacktrace at the bottom). It occurred both in local and cluster execution 
 and on MacOS as well as on linux. Data size does not seem to be the reason: 
 the error occurred on a 6.5GB dataset as well as on a small 400MB dataset.
 Our code uses Bulk iterations and the suspicion is that cached build-side 
 files are accidentally removed too early. As far as we observed it, the 
 exception always happens in an iteration later than the first one (mostly 
 iteration 2).
 Interestingly, on one occasion the error disappeared consistently when 
 setting the number of maximum iterations higher (from 2 to 6).
 On another occasion, the exception appeared when adding a simple map operator 
 at the end (holding the identity function).
 Generally, the error is quite hard to reproduce.
 The stacktrace:
 java.io.IOException: Channel to path 
 '/var/folders/xx/0dd3w4jd7fbb4ytmhqxm157hgn/T/flink-io-f5061483-ff59-43dc-883f-79af813d5804/19a70637e025c7ee3919b30239060895.23.channel'
  could not be opened.
   at 
 org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.init(AbstractFileIOChannel.java:61)
   at 
 org.apache.flink.runtime.io.disk.iomanager.AsynchronousFileIOChannel.init(AsynchronousFileIOChannel.java:86)
   at 
 org.apache.flink.runtime.io.disk.iomanager.AsynchronousBulkBlockReader.init(AsynchronousBulkBlockReader.java:46)
   at 
 org.apache.flink.runtime.io.disk.iomanager.AsynchronousBulkBlockReader.init(AsynchronousBulkBlockReader.java:39)
   at 
 org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync.createBulkBlockChannelReader(IOManagerAsync.java:263)
   at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.buildTableFromSpilledPartition(MutableHashTable.java:751)
   at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.prepareNextPartition(MutableHashTable.java:508)
   at 
 org.apache.flink.runtime.operators.hash.ReOpenableMutableHashTable.prepareNextPartition(ReOpenableMutableHashTable.java:167)
   at 
 org.apache.flink.runtime.operators.hash.MutableHashTable.nextRecord(MutableHashTable.java:544)
   at 
 org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashMatchIterator.callWithNextKey(NonReusingBuildFirstHashMatchIterator.java:104)
   at 
 org.apache.flink.runtime.operators.AbstractCachedBuildSideMatchDriver.run(AbstractCachedBuildSideMatchDriver.java:155)
   at 
 org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
   at 
 org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:139)
   at 
 org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask.run(IterationIntermediatePactTask.java:92)
   at 
 org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362)
   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
   at java.lang.Thread.run(Thread.java:745)
 Caused by: java.io.FileNotFoundException: 
 /var/folders/xx/0dd3w4jd7fbb4ytmhqxm157hgn/T/flink-io-f5061483-ff59-43dc-883f-79af813d5804/19a70637e025c7ee3919b30239060895.23.channel
  (No such file or directory)
   at java.io.RandomAccessFile.open0(Native Method)
   at java.io.RandomAccessFile.open(RandomAccessFile.java:316)
   at java.io.RandomAccessFile.init(RandomAccessFile.java:243)
   at java.io.RandomAccessFile.init(RandomAccessFile.java:124)
   at 
 org.apache.flink.runtime.io.disk.iomanager.AbstractFileIOChannel.init(AbstractFileIOChannel.java:57)
   ... 16 more



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2478) The page “FlinkML - Machine Learning for Flink“ https://ci.apache.org/projects/flink/flink-docs-master/libs/ml/ contains a dead link

2015-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698363#comment-14698363
 ] 

ASF GitHub Bot commented on FLINK-2478:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1021#issuecomment-131402990
  
If the two arrays are of different lengths, should this throw an exception? 
After all, the cosine distance is not well defined then and returning 0 may 
give the impression that things are correct, when they are not.


 The page “FlinkML - Machine Learning for Flink“  
 https://ci.apache.org/projects/flink/flink-docs-master/libs/ml/ contains a 
 dead link
 -

 Key: FLINK-2478
 URL: https://issues.apache.org/jira/browse/FLINK-2478
 Project: Flink
  Issue Type: Task
  Components: Documentation
Affects Versions: 0.10
Reporter: Slim Baltagi
Assignee: Till Rohrmann
Priority: Minor

 Note that FlinkML is currently not part of the binary distribution. See 
 linking with it for cluster execution here.
 'here' links to a dead link: 
 https://ci.apache.org/projects/flink/flink-docs-master/libs/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution
 The correct link is: 
 https://ci.apache.org/projects/flink/flink-docs-master/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2478]fix the array may have out of boun...

2015-08-15 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1021#issuecomment-131402990
  
If the two arrays are of different lengths, should this throw an exception? 
After all, the cosine distance is not well defined then and returning 0 may 
give the impression that things are correct, when they are not.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Some updates for programming_guide.md

2015-08-15 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1019#issuecomment-131403119
  
Most of this is good, but in parts, it confuses the `DataSet` and 
`DataStream` API. The `DataSet` API has no functionality to write to sockets.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Stale Synchronous Parallel Iterations

2015-08-15 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/967#issuecomment-131403208
  
@nltran How would your use of the parameter server work together with what 
is proposed in #1003 ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Parameter Server: Distributed Key-Value store,...

2015-08-15 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1003#issuecomment-131403660
  
This is again baked into the Flink runtime. Is there a way to keep this 
separate?

I am still a bit puzzled if this is the pattern with which people use 
parameter servers: embedded in the workers that operate on the training data. 
My impression so far was that the parameter servers are a separate component, 
usually running on different machines.

Also, could you comment on how this could work with the changes proposes in 
#967


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37137570
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/MLUtils.scala ---
@@ -119,4 +120,30 @@ object MLUtils {
 
 stringRepresentation.writeAsText(filePath)
   }
+
+  /** Create a [[DataSet]] of [[OnlineHistogram]] from the input data
+*
+* @param bins Number of bins required. Zero for 
[[CategoricalHistogram]]
+* @param data input [[DataSet]] of [[Double]]
+* @return [[DataSet]] of [[OnlineHistogram]]
+*/
+  private [ml] def createHistogram(data: DataSet[Double], bins: Int): 
DataSet[OnlineHistogram] = {
+val min = data.map(x = x).reduce((x,y) = Math.min(x,y))
+val max = data.map(x = x).reduce((x,y) = Math.max(x,y))
--- End diff --

We don't need `map(x = x)` operation and need space between `x,` and `y`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37137605
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/CategoricalHistogram.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import scala.collection.mutable
+
+/** Implementation of a discrete valued online histogram
+  *
+  * =Parameters=
+  * -[[numCategories]]:
+  *   Number of categories in the histogram
+  */
+case class CategoricalHistogram(numCategories: Int) extends 
OnlineHistogram {
+
+  require(numCategories  0, Capacity must be greater than zero)
+  val data = new mutable.HashMap[Double, Int]()
+
+  /** Number of categories in the histogram
+*
+* @return number of categories
+*/
+  override def bins: Int = {
+numCategories
+  }
+
+  /** Increment count of category c
+*
+* @param c category whose count needs to be incremented
+*/
+  override def add(c: Double): Unit = {
+if (data.get(c).isEmpty) {
+  require(data.size  numCategories, Insufficient capacity. Failed to 
add.)
+  data.put(c, 1)
+} else {
+  data.update(c, data.get(c).get + 1)
+}
+  }
+
+  /** Merges the histogram with h and returns a new histogram
+*
+* @param h histogram to be merged
+* @param B number of categories in the resultant histogram.
+*  (Default: ```0```, number of categories will be the size of 
union of categories in
+*  both histograms)
+* @return Merged histogram with capacity B
+*/
+  override def merge(h: OnlineHistogram, B: Int = 0): CategoricalHistogram 
= {
+h match {
+  case h1: CategoricalHistogram = {
+val finalMap = new mutable.HashMap[Double, Int]()
+data.iterator.foreach(x = finalMap.put(x._1, x._2))
+h1.data.iterator.foreach(x = {
+  if (finalMap.get(x._1).isEmpty) {
+finalMap.put(x._1, x._2)
+  } else {
+finalMap.update(x._1, x._2 + finalMap.get(x._1).get)
+  }
+})
--- End diff --

As I said above, we can reduce calling `get(x._1)` by using pattern 
matching for `finalMap.get(x._1)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37137592
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/CategoricalHistogram.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import scala.collection.mutable
+
+/** Implementation of a discrete valued online histogram
+  *
+  * =Parameters=
+  * -[[numCategories]]:
+  *   Number of categories in the histogram
+  */
+case class CategoricalHistogram(numCategories: Int) extends 
OnlineHistogram {
+
+  require(numCategories  0, Capacity must be greater than zero)
+  val data = new mutable.HashMap[Double, Int]()
+
+  /** Number of categories in the histogram
+*
+* @return number of categories
+*/
+  override def bins: Int = {
+numCategories
+  }
+
+  /** Increment count of category c
+*
+* @param c category whose count needs to be incremented
+*/
+  override def add(c: Double): Unit = {
+if (data.get(c).isEmpty) {
+  require(data.size  numCategories, Insufficient capacity. Failed to 
add.)
+  data.put(c, 1)
+} else {
+  data.update(c, data.get(c).get + 1)
+}
--- End diff --

If use pattern matching for `data.get(c)`, we can reduce calling 
`data.get(c)`.

For example:

```scala
data.get(c) match {
  case None =
require(data.size  numCategories, Insufficient capacity. Failed to 
add.)
data.put(c, 1)
  case Some(v) = data.update(c, v + 1)
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: Some updates for programming_guide.md

2015-08-15 Thread chenliang613
Github user chenliang613 commented on the pull request:

https://github.com/apache/flink/pull/1019#issuecomment-131405427
  
ok, update the sockets part.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37137631
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/CategoricalHistogram.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import scala.collection.mutable
+
+/** Implementation of a discrete valued online histogram
+  *
+  * =Parameters=
+  * -[[numCategories]]:
+  *   Number of categories in the histogram
+  */
+case class CategoricalHistogram(numCategories: Int) extends 
OnlineHistogram {
+
+  require(numCategories  0, Capacity must be greater than zero)
+  val data = new mutable.HashMap[Double, Int]()
+
+  /** Number of categories in the histogram
+*
+* @return number of categories
+*/
+  override def bins: Int = {
+numCategories
+  }
+
+  /** Increment count of category c
+*
+* @param c category whose count needs to be incremented
+*/
+  override def add(c: Double): Unit = {
+if (data.get(c).isEmpty) {
+  require(data.size  numCategories, Insufficient capacity. Failed to 
add.)
+  data.put(c, 1)
+} else {
+  data.update(c, data.get(c).get + 1)
+}
+  }
+
+  /** Merges the histogram with h and returns a new histogram
+*
+* @param h histogram to be merged
+* @param B number of categories in the resultant histogram.
+*  (Default: ```0```, number of categories will be the size of 
union of categories in
+*  both histograms)
+* @return Merged histogram with capacity B
+*/
+  override def merge(h: OnlineHistogram, B: Int = 0): CategoricalHistogram 
= {
+h match {
+  case h1: CategoricalHistogram = {
+val finalMap = new mutable.HashMap[Double, Int]()
+data.iterator.foreach(x = finalMap.put(x._1, x._2))
+h1.data.iterator.foreach(x = {
+  if (finalMap.get(x._1).isEmpty) {
+finalMap.put(x._1, x._2)
+  } else {
+finalMap.update(x._1, x._2 + finalMap.get(x._1).get)
+  }
+})
+require(B == 0 || finalMap.size = B, Insufficient capacity. 
Failed to merge)
+val countBuffer = new mutable.ArrayBuffer[(Double, Int)]()
+finalMap.iterator.foreach(x = countBuffer += ((x._1, x._2)))
+var finalSize = finalMap.size
+if (B  0) {
+  finalSize = B
+}
+val ret = new CategoricalHistogram(finalSize)
+ret.loadData(countBuffer.toArray)
--- End diff --

Why the data in `finalMap` should be copied to `countBuffer`?

```scala
val finalSize = if (B  0) B else finalMap.size
val ret = new CategoricalHistogram(finalSize)
ret.loadData(finalMap.toArray)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37137647
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/CategoricalHistogram.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import scala.collection.mutable
+
+/** Implementation of a discrete valued online histogram
+  *
+  * =Parameters=
+  * -[[numCategories]]:
+  *   Number of categories in the histogram
+  */
+case class CategoricalHistogram(numCategories: Int) extends 
OnlineHistogram {
+
+  require(numCategories  0, Capacity must be greater than zero)
+  val data = new mutable.HashMap[Double, Int]()
+
+  /** Number of categories in the histogram
+*
+* @return number of categories
+*/
+  override def bins: Int = {
+numCategories
+  }
+
+  /** Increment count of category c
+*
+* @param c category whose count needs to be incremented
+*/
+  override def add(c: Double): Unit = {
+if (data.get(c).isEmpty) {
+  require(data.size  numCategories, Insufficient capacity. Failed to 
add.)
+  data.put(c, 1)
+} else {
+  data.update(c, data.get(c).get + 1)
+}
+  }
+
+  /** Merges the histogram with h and returns a new histogram
+*
+* @param h histogram to be merged
+* @param B number of categories in the resultant histogram.
+*  (Default: ```0```, number of categories will be the size of 
union of categories in
+*  both histograms)
+* @return Merged histogram with capacity B
+*/
+  override def merge(h: OnlineHistogram, B: Int = 0): CategoricalHistogram 
= {
+h match {
+  case h1: CategoricalHistogram = {
+val finalMap = new mutable.HashMap[Double, Int]()
+data.iterator.foreach(x = finalMap.put(x._1, x._2))
+h1.data.iterator.foreach(x = {
+  if (finalMap.get(x._1).isEmpty) {
+finalMap.put(x._1, x._2)
+  } else {
+finalMap.update(x._1, x._2 + finalMap.get(x._1).get)
+  }
+})
+require(B == 0 || finalMap.size = B, Insufficient capacity. 
Failed to merge)
+val countBuffer = new mutable.ArrayBuffer[(Double, Int)]()
+finalMap.iterator.foreach(x = countBuffer += ((x._1, x._2)))
+var finalSize = finalMap.size
+if (B  0) {
+  finalSize = B
+}
+val ret = new CategoricalHistogram(finalSize)
+ret.loadData(countBuffer.toArray)
+ret
+  }
+  case default =
+throw new RuntimeException(Only a categorical histogram is 
allowed to be merged with a  +
+  categorical histogram)
+}
+  }
+
+  /** Number of elements in category c
+*
+* @return Number of points in category c
+*/
+  def count(c: Double): Int = {
+if (data.get(c).isEmpty) {
+  return 0
+}
+data.get(c).get
+  }
--- End diff --

I think following is better than current implementation.

```scala
def count(c: Double): Int = data.get(c) match {
  case None = 0
  case Some(v) = v
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37137658
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/CategoricalHistogram.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import scala.collection.mutable
+
+/** Implementation of a discrete valued online histogram
+  *
+  * =Parameters=
+  * -[[numCategories]]:
+  *   Number of categories in the histogram
+  */
+case class CategoricalHistogram(numCategories: Int) extends 
OnlineHistogram {
+
+  require(numCategories  0, Capacity must be greater than zero)
+  val data = new mutable.HashMap[Double, Int]()
+
+  /** Number of categories in the histogram
+*
+* @return number of categories
+*/
+  override def bins: Int = {
+numCategories
+  }
+
+  /** Increment count of category c
+*
+* @param c category whose count needs to be incremented
+*/
+  override def add(c: Double): Unit = {
+if (data.get(c).isEmpty) {
+  require(data.size  numCategories, Insufficient capacity. Failed to 
add.)
+  data.put(c, 1)
+} else {
+  data.update(c, data.get(c).get + 1)
+}
+  }
+
+  /** Merges the histogram with h and returns a new histogram
+*
+* @param h histogram to be merged
+* @param B number of categories in the resultant histogram.
+*  (Default: ```0```, number of categories will be the size of 
union of categories in
+*  both histograms)
+* @return Merged histogram with capacity B
+*/
+  override def merge(h: OnlineHistogram, B: Int = 0): CategoricalHistogram 
= {
+h match {
+  case h1: CategoricalHistogram = {
+val finalMap = new mutable.HashMap[Double, Int]()
+data.iterator.foreach(x = finalMap.put(x._1, x._2))
+h1.data.iterator.foreach(x = {
+  if (finalMap.get(x._1).isEmpty) {
+finalMap.put(x._1, x._2)
+  } else {
+finalMap.update(x._1, x._2 + finalMap.get(x._1).get)
+  }
+})
+require(B == 0 || finalMap.size = B, Insufficient capacity. 
Failed to merge)
+val countBuffer = new mutable.ArrayBuffer[(Double, Int)]()
+finalMap.iterator.foreach(x = countBuffer += ((x._1, x._2)))
+var finalSize = finalMap.size
+if (B  0) {
+  finalSize = B
+}
+val ret = new CategoricalHistogram(finalSize)
+ret.loadData(countBuffer.toArray)
+ret
+  }
+  case default =
+throw new RuntimeException(Only a categorical histogram is 
allowed to be merged with a  +
+  categorical histogram)
+}
+  }
+
+  /** Number of elements in category c
+*
+* @return Number of points in category c
+*/
+  def count(c: Double): Int = {
+if (data.get(c).isEmpty) {
+  return 0
+}
+data.get(c).get
+  }
+
+  /** Returns the total number of elements in the histogram
+*
+* @return total number of elements added so far
+*/
+  override def total: Int = {
+var result = 0
+data.valuesIterator.foreach(x = result += x)
+result
+  }
--- End diff --

We can implement `total` method without mutable variable.

```scala
override def total: Int = data.valuesIterator.sum
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37137718
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/ContinuousHistogram.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import scala.Double.MaxValue
+import scala.collection.mutable
+
+/** Implementation of a continuous valued online histogram
+  * Adapted from Ben-Haim and Yom-Tov's Streaming Decision Tree Algorithm
+  * Refer http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf
+  *
+  * =Parameters=
+  * -[[capacity]]:
+  *   Number of bins to be used in the histogram
+  *
+  * -[[min]]:
+  *   Lower limit on the elements
+  *
+  * -[[max]]:
+  *   Upper limit on the elements
+  */
+case class ContinuousHistogram(capacity: Int, min: Double, max: Double) 
extends OnlineHistogram {
+
+  require(capacity  0, Capacity should be a positive integer)
+  require(lower  upper, Lower must be less than upper)
+
+  val data = new mutable.ArrayBuffer[(Double, Int)]()
+
+  /** Adds a new item to the histogram
+*
+* @param p value to be added
+*/
+  override def add(p: Double): Unit = {
+require(p  lower  p  upper, p +  not in ( + lower + , + upper 
+ ))
+// search for the index where the value is just higher than p
+val search = find(p)
+// add the new value there, shifting everything to the right
+data.insert(search, (p, 1))
+// If we're over capacity or any two elements are within 1e-9 of each 
other, merge.
+// This will take care of the case if p was actually equal to some 
value in the histogram and
+// just increment the value there
+mergeElements()
+  }
+
+  /** Merges the histogram with h and returns a histogram with capacity B
+*
+* @param h histogram to be merged
+* @param B capacity of the resultant histogram
+* @return Merged histogram with capacity B
+*/
+  override def merge(h: OnlineHistogram, B: Int): ContinuousHistogram = {
+h match {
+  case temp: ContinuousHistogram = {
+val m: Int = bins
+val n: Int = temp.bins
+var i, j: Int = 0
+val mergeList = new mutable.ArrayBuffer[(Double, Int)]()
+while (i  m || j  n) {
+  if (i = m) {
+mergeList += ((temp.getValue(j), temp.getCounter(j)))
+j = j + 1
+  } else if (j = n || getValue(i) = temp.getValue(j)) {
+mergeList += (data.apply(i))
+i = i + 1
+  } else {
+mergeList += ((temp.getValue(j), temp.getCounter(j)))
+j = j + 1
+  }
+}
+// the size will be brought to capacity while constructing the new 
histogram itself
+val finalLower = Math.min(lower, temp.lower)
+val finalUpper = Math.max(upper, temp.upper)
+val ret = new ContinuousHistogram(B, finalLower, finalUpper)
+ret.loadData(mergeList.toArray)
+ret
+  }
+  case default =
+throw new RuntimeException(Only a continuous histogram is allowed 
to be merged with a  +
+  continuous histogram)
+
+}
+  }
+
+  /** Returns the qth quantile of the histogram
+*
+* @param q Quantile value in (0,1)
+* @return Value at quantile q
+*/
+  def quantile(q: Double): Double = {
+require(bins  0, Histogram is empty)
+require(q  0  q  1, Quantile must be between 0 and 1)
+var total = 0
+for (i - 0 to bins - 1) {
+  total = total + getCounter(i)
+}
+val wantedSum = (q * total).round.toInt
+var currSum = count(getValue(0))
+
+if (wantedSum  currSum) {
+  require(lower  -MaxValue, Set a lower bound before 

[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37137756
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/ContinuousHistogram.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import scala.Double.MaxValue
+import scala.collection.mutable
+
+/** Implementation of a continuous valued online histogram
+  * Adapted from Ben-Haim and Yom-Tov's Streaming Decision Tree Algorithm
+  * Refer http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf
+  *
+  * =Parameters=
+  * -[[capacity]]:
+  *   Number of bins to be used in the histogram
+  *
+  * -[[min]]:
+  *   Lower limit on the elements
+  *
+  * -[[max]]:
+  *   Upper limit on the elements
+  */
+case class ContinuousHistogram(capacity: Int, min: Double, max: Double) 
extends OnlineHistogram {
+
+  require(capacity  0, Capacity should be a positive integer)
+  require(lower  upper, Lower must be less than upper)
+
+  val data = new mutable.ArrayBuffer[(Double, Int)]()
+
+  /** Adds a new item to the histogram
+*
+* @param p value to be added
+*/
+  override def add(p: Double): Unit = {
+require(p  lower  p  upper, p +  not in ( + lower + , + upper 
+ ))
+// search for the index where the value is just higher than p
+val search = find(p)
+// add the new value there, shifting everything to the right
+data.insert(search, (p, 1))
+// If we're over capacity or any two elements are within 1e-9 of each 
other, merge.
+// This will take care of the case if p was actually equal to some 
value in the histogram and
+// just increment the value there
+mergeElements()
+  }
+
+  /** Merges the histogram with h and returns a histogram with capacity B
+*
+* @param h histogram to be merged
+* @param B capacity of the resultant histogram
+* @return Merged histogram with capacity B
+*/
+  override def merge(h: OnlineHistogram, B: Int): ContinuousHistogram = {
+h match {
+  case temp: ContinuousHistogram = {
+val m: Int = bins
+val n: Int = temp.bins
+var i, j: Int = 0
+val mergeList = new mutable.ArrayBuffer[(Double, Int)]()
+while (i  m || j  n) {
+  if (i = m) {
+mergeList += ((temp.getValue(j), temp.getCounter(j)))
+j = j + 1
+  } else if (j = n || getValue(i) = temp.getValue(j)) {
+mergeList += (data.apply(i))
+i = i + 1
+  } else {
+mergeList += ((temp.getValue(j), temp.getCounter(j)))
+j = j + 1
+  }
+}
+// the size will be brought to capacity while constructing the new 
histogram itself
+val finalLower = Math.min(lower, temp.lower)
+val finalUpper = Math.max(upper, temp.upper)
+val ret = new ContinuousHistogram(B, finalLower, finalUpper)
+ret.loadData(mergeList.toArray)
+ret
+  }
+  case default =
+throw new RuntimeException(Only a continuous histogram is allowed 
to be merged with a  +
+  continuous histogram)
+
+}
+  }
+
+  /** Returns the qth quantile of the histogram
+*
+* @param q Quantile value in (0,1)
+* @return Value at quantile q
+*/
+  def quantile(q: Double): Double = {
+require(bins  0, Histogram is empty)
+require(q  0  q  1, Quantile must be between 0 and 1)
+var total = 0
+for (i - 0 to bins - 1) {
+  total = total + getCounter(i)
+}
+val wantedSum = (q * total).round.toInt
+var currSum = count(getValue(0))
+
+if (wantedSum  currSum) {
+  require(lower  -MaxValue, Set a lower bound before 

[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37137749
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/ContinuousHistogram.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import scala.Double.MaxValue
+import scala.collection.mutable
+
+/** Implementation of a continuous valued online histogram
+  * Adapted from Ben-Haim and Yom-Tov's Streaming Decision Tree Algorithm
+  * Refer http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf
+  *
+  * =Parameters=
+  * -[[capacity]]:
+  *   Number of bins to be used in the histogram
+  *
+  * -[[min]]:
+  *   Lower limit on the elements
+  *
+  * -[[max]]:
+  *   Upper limit on the elements
+  */
+case class ContinuousHistogram(capacity: Int, min: Double, max: Double) 
extends OnlineHistogram {
+
+  require(capacity  0, Capacity should be a positive integer)
+  require(lower  upper, Lower must be less than upper)
+
+  val data = new mutable.ArrayBuffer[(Double, Int)]()
+
+  /** Adds a new item to the histogram
+*
+* @param p value to be added
+*/
+  override def add(p: Double): Unit = {
+require(p  lower  p  upper, p +  not in ( + lower + , + upper 
+ ))
+// search for the index where the value is just higher than p
+val search = find(p)
+// add the new value there, shifting everything to the right
+data.insert(search, (p, 1))
+// If we're over capacity or any two elements are within 1e-9 of each 
other, merge.
+// This will take care of the case if p was actually equal to some 
value in the histogram and
+// just increment the value there
+mergeElements()
+  }
+
+  /** Merges the histogram with h and returns a histogram with capacity B
+*
+* @param h histogram to be merged
+* @param B capacity of the resultant histogram
+* @return Merged histogram with capacity B
+*/
+  override def merge(h: OnlineHistogram, B: Int): ContinuousHistogram = {
+h match {
+  case temp: ContinuousHistogram = {
+val m: Int = bins
+val n: Int = temp.bins
+var i, j: Int = 0
+val mergeList = new mutable.ArrayBuffer[(Double, Int)]()
+while (i  m || j  n) {
+  if (i = m) {
+mergeList += ((temp.getValue(j), temp.getCounter(j)))
+j = j + 1
+  } else if (j = n || getValue(i) = temp.getValue(j)) {
+mergeList += (data.apply(i))
+i = i + 1
+  } else {
+mergeList += ((temp.getValue(j), temp.getCounter(j)))
+j = j + 1
+  }
+}
+// the size will be brought to capacity while constructing the new 
histogram itself
+val finalLower = Math.min(lower, temp.lower)
+val finalUpper = Math.max(upper, temp.upper)
+val ret = new ContinuousHistogram(B, finalLower, finalUpper)
+ret.loadData(mergeList.toArray)
+ret
+  }
+  case default =
+throw new RuntimeException(Only a continuous histogram is allowed 
to be merged with a  +
+  continuous histogram)
+
+}
+  }
+
+  /** Returns the qth quantile of the histogram
+*
+* @param q Quantile value in (0,1)
+* @return Value at quantile q
+*/
+  def quantile(q: Double): Double = {
+require(bins  0, Histogram is empty)
+require(q  0  q  1, Quantile must be between 0 and 1)
+var total = 0
+for (i - 0 to bins - 1) {
+  total = total + getCounter(i)
+}
+val wantedSum = (q * total).round.toInt
+var currSum = count(getValue(0))
+
+if (wantedSum  currSum) {
+  require(lower  -MaxValue, Set a lower bound before 

[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37137752
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/ContinuousHistogram.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import scala.Double.MaxValue
+import scala.collection.mutable
+
+/** Implementation of a continuous valued online histogram
+  * Adapted from Ben-Haim and Yom-Tov's Streaming Decision Tree Algorithm
+  * Refer http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf
+  *
+  * =Parameters=
+  * -[[capacity]]:
+  *   Number of bins to be used in the histogram
+  *
+  * -[[min]]:
+  *   Lower limit on the elements
+  *
+  * -[[max]]:
+  *   Upper limit on the elements
+  */
+case class ContinuousHistogram(capacity: Int, min: Double, max: Double) 
extends OnlineHistogram {
+
+  require(capacity  0, Capacity should be a positive integer)
+  require(lower  upper, Lower must be less than upper)
+
+  val data = new mutable.ArrayBuffer[(Double, Int)]()
+
+  /** Adds a new item to the histogram
+*
+* @param p value to be added
+*/
+  override def add(p: Double): Unit = {
+require(p  lower  p  upper, p +  not in ( + lower + , + upper 
+ ))
+// search for the index where the value is just higher than p
+val search = find(p)
+// add the new value there, shifting everything to the right
+data.insert(search, (p, 1))
+// If we're over capacity or any two elements are within 1e-9 of each 
other, merge.
+// This will take care of the case if p was actually equal to some 
value in the histogram and
+// just increment the value there
+mergeElements()
+  }
+
+  /** Merges the histogram with h and returns a histogram with capacity B
+*
+* @param h histogram to be merged
+* @param B capacity of the resultant histogram
+* @return Merged histogram with capacity B
+*/
+  override def merge(h: OnlineHistogram, B: Int): ContinuousHistogram = {
+h match {
+  case temp: ContinuousHistogram = {
+val m: Int = bins
+val n: Int = temp.bins
+var i, j: Int = 0
+val mergeList = new mutable.ArrayBuffer[(Double, Int)]()
+while (i  m || j  n) {
+  if (i = m) {
+mergeList += ((temp.getValue(j), temp.getCounter(j)))
+j = j + 1
+  } else if (j = n || getValue(i) = temp.getValue(j)) {
+mergeList += (data.apply(i))
+i = i + 1
+  } else {
+mergeList += ((temp.getValue(j), temp.getCounter(j)))
+j = j + 1
+  }
+}
+// the size will be brought to capacity while constructing the new 
histogram itself
+val finalLower = Math.min(lower, temp.lower)
+val finalUpper = Math.max(upper, temp.upper)
+val ret = new ContinuousHistogram(B, finalLower, finalUpper)
+ret.loadData(mergeList.toArray)
+ret
+  }
+  case default =
+throw new RuntimeException(Only a continuous histogram is allowed 
to be merged with a  +
+  continuous histogram)
+
+}
+  }
+
+  /** Returns the qth quantile of the histogram
+*
+* @param q Quantile value in (0,1)
+* @return Value at quantile q
+*/
+  def quantile(q: Double): Double = {
+require(bins  0, Histogram is empty)
+require(q  0  q  1, Quantile must be between 0 and 1)
+var total = 0
+for (i - 0 to bins - 1) {
+  total = total + getCounter(i)
+}
+val wantedSum = (q * total).round.toInt
+var currSum = count(getValue(0))
+
+if (wantedSum  currSum) {
+  require(lower  -MaxValue, Set a lower bound before 

[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37137762
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/ContinuousHistogram.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import scala.Double.MaxValue
+import scala.collection.mutable
+
+/** Implementation of a continuous valued online histogram
+  * Adapted from Ben-Haim and Yom-Tov's Streaming Decision Tree Algorithm
+  * Refer http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf
+  *
+  * =Parameters=
+  * -[[capacity]]:
+  *   Number of bins to be used in the histogram
+  *
+  * -[[min]]:
+  *   Lower limit on the elements
+  *
+  * -[[max]]:
+  *   Upper limit on the elements
+  */
+case class ContinuousHistogram(capacity: Int, min: Double, max: Double) 
extends OnlineHistogram {
+
+  require(capacity  0, Capacity should be a positive integer)
+  require(lower  upper, Lower must be less than upper)
+
+  val data = new mutable.ArrayBuffer[(Double, Int)]()
+
+  /** Adds a new item to the histogram
+*
+* @param p value to be added
+*/
+  override def add(p: Double): Unit = {
+require(p  lower  p  upper, p +  not in ( + lower + , + upper 
+ ))
+// search for the index where the value is just higher than p
+val search = find(p)
+// add the new value there, shifting everything to the right
+data.insert(search, (p, 1))
+// If we're over capacity or any two elements are within 1e-9 of each 
other, merge.
+// This will take care of the case if p was actually equal to some 
value in the histogram and
+// just increment the value there
+mergeElements()
+  }
+
+  /** Merges the histogram with h and returns a histogram with capacity B
+*
+* @param h histogram to be merged
+* @param B capacity of the resultant histogram
+* @return Merged histogram with capacity B
+*/
+  override def merge(h: OnlineHistogram, B: Int): ContinuousHistogram = {
+h match {
+  case temp: ContinuousHistogram = {
+val m: Int = bins
+val n: Int = temp.bins
+var i, j: Int = 0
+val mergeList = new mutable.ArrayBuffer[(Double, Int)]()
+while (i  m || j  n) {
+  if (i = m) {
+mergeList += ((temp.getValue(j), temp.getCounter(j)))
+j = j + 1
+  } else if (j = n || getValue(i) = temp.getValue(j)) {
+mergeList += (data.apply(i))
+i = i + 1
+  } else {
+mergeList += ((temp.getValue(j), temp.getCounter(j)))
+j = j + 1
+  }
+}
+// the size will be brought to capacity while constructing the new 
histogram itself
+val finalLower = Math.min(lower, temp.lower)
+val finalUpper = Math.max(upper, temp.upper)
+val ret = new ContinuousHistogram(B, finalLower, finalUpper)
+ret.loadData(mergeList.toArray)
+ret
+  }
+  case default =
+throw new RuntimeException(Only a continuous histogram is allowed 
to be merged with a  +
+  continuous histogram)
+
+}
+  }
+
+  /** Returns the qth quantile of the histogram
+*
+* @param q Quantile value in (0,1)
+* @return Value at quantile q
+*/
+  def quantile(q: Double): Double = {
+require(bins  0, Histogram is empty)
+require(q  0  q  1, Quantile must be between 0 and 1)
+var total = 0
+for (i - 0 to bins - 1) {
+  total = total + getCounter(i)
+}
+val wantedSum = (q * total).round.toInt
+var currSum = count(getValue(0))
+
+if (wantedSum  currSum) {
+  require(lower  -MaxValue, Set a lower bound before 

[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37137774
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/FieldStats.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import org.apache.flink.ml.statistics.FieldType._
+
+import scala.collection.mutable
+
+/** Class to represent Field statistics.
+  *
+  * =Parameters=
+  * -[[fieldType]]:
+  *   Type of this field, [[DISCRETE]] or [[CONTINUOUS]]
+  *
+  * For [[DISCRETE]] fields, [[entropy]], [[gini]] and [[categoryCounts]] 
are provided.
+  * For [[CONTINUOUS]] fields, [[min]], [[max]], [[mean]] and [[variance]] 
are provided.
+  *
+  */
+class FieldStats(val fieldType: FieldType) extends Serializable {
+  // field parameters
+  private [statistics] var _min: Double = _
+  private [statistics] var _max: Double = _
+  private [statistics] var _mean: Double = _
+  private [statistics] var _variance: Double = _
+  private [statistics] var _counts: mutable.HashMap[Double,Int] = _
+
--- End diff --

Just for cosmetic, `private[statistics]` would be better to match current 
coding style.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37137811
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/FieldStats.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import org.apache.flink.ml.statistics.FieldType._
+
+import scala.collection.mutable
+
+/** Class to represent Field statistics.
+  *
+  * =Parameters=
+  * -[[fieldType]]:
+  *   Type of this field, [[DISCRETE]] or [[CONTINUOUS]]
+  *
+  * For [[DISCRETE]] fields, [[entropy]], [[gini]] and [[categoryCounts]] 
are provided.
+  * For [[CONTINUOUS]] fields, [[min]], [[max]], [[mean]] and [[variance]] 
are provided.
+  *
+  */
+class FieldStats(val fieldType: FieldType) extends Serializable {
+  // field parameters
+  private [statistics] var _min: Double = _
+  private [statistics] var _max: Double = _
+  private [statistics] var _mean: Double = _
+  private [statistics] var _variance: Double = _
+  private [statistics] var _counts: mutable.HashMap[Double,Int] = _
+
+  // Access methods //
+  def min: Double = {
+exception(DISCRETE)
+_min
+  }
+
+  def max: Double = {
+exception(DISCRETE)
+_max
+  }
+
+  def mean: Double = {
+exception(DISCRETE)
+_mean
+  }
+
+  def variance: Double = {
+exception(DISCRETE)
+_variance
+  }
+
+  def categoryCounts: mutable.HashMap[Double,Int] = {
+exception(CONTINUOUS)
+_counts
+  }
+
+  /**
+   * Returns the entropy value for this [[DISCRETE]] field.
+   */
+  def entropy: Double = {
+exception(CONTINUOUS)
+val total: Double = _counts.valuesIterator.sum
+3.322 * _counts.iterator.map(x = - (x._2 / total) * Math.log10(x._2 / 
total)).sum
+  }
+
+  /**
+   * Returns the Gini impurity for this [[DISCRETE]] field.
+   */
+  def gini: Double = {
+exception(CONTINUOUS)
+val total: Double = _counts.valuesIterator.sum
+1 - _counts.iterator.map(x = (x._2 * x._2)).sum / (total * total)
+  }
+
+  //-- Setter methods --//
+  private [statistics] def setContinuousParameters(
+  min: Double,
+  max: Double,
+  mean: Double,
+  variance: Double)
+: Unit = {
+exception(DISCRETE)
+_min = min
+_max = max
+_mean = mean
+_variance = variance
+  }
+
+  private [statistics] def setDiscreteParameters(counts: 
mutable.HashMap[Double,Int]): Unit = {
+exception(CONTINUOUS)
+_counts = counts
+  }
+
+  private def exception(checkFor: FieldType) = {
+if(fieldType == checkFor){
+  throw new RuntimeException(Invalid access of data. Check field 
types.)
+}
+  }
--- End diff --

Maybe If we check `FieldType` in each method (`min`, `max`, `mean`, ..., 
`gini`) with `Predef.assume` method, the user can understand what is the 
problem more properly. For example, when the user call `gini` method for 
continuous histogram, we can throw exception with detailed message such as 
Gini impurity for continuous histogram is not supported..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37137819
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/FieldStats.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import org.apache.flink.ml.statistics.FieldType._
+
+import scala.collection.mutable
+
+/** Class to represent Field statistics.
+  *
+  * =Parameters=
+  * -[[fieldType]]:
+  *   Type of this field, [[DISCRETE]] or [[CONTINUOUS]]
+  *
+  * For [[DISCRETE]] fields, [[entropy]], [[gini]] and [[categoryCounts]] 
are provided.
+  * For [[CONTINUOUS]] fields, [[min]], [[max]], [[mean]] and [[variance]] 
are provided.
+  *
+  */
+class FieldStats(val fieldType: FieldType) extends Serializable {
+  // field parameters
+  private [statistics] var _min: Double = _
+  private [statistics] var _max: Double = _
+  private [statistics] var _mean: Double = _
+  private [statistics] var _variance: Double = _
+  private [statistics] var _counts: mutable.HashMap[Double,Int] = _
+
+  // Access methods //
+  def min: Double = {
+exception(DISCRETE)
+_min
+  }
+
+  def max: Double = {
+exception(DISCRETE)
+_max
+  }
+
+  def mean: Double = {
+exception(DISCRETE)
+_mean
+  }
+
+  def variance: Double = {
+exception(DISCRETE)
+_variance
+  }
+
+  def categoryCounts: mutable.HashMap[Double,Int] = {
+exception(CONTINUOUS)
+_counts
+  }
+
+  /**
+   * Returns the entropy value for this [[DISCRETE]] field.
+   */
+  def entropy: Double = {
+exception(CONTINUOUS)
+val total: Double = _counts.valuesIterator.sum
+3.322 * _counts.iterator.map(x = - (x._2 / total) * Math.log10(x._2 / 
total)).sum
+  }
+
+  /**
+   * Returns the Gini impurity for this [[DISCRETE]] field.
+   */
+  def gini: Double = {
+exception(CONTINUOUS)
+val total: Double = _counts.valuesIterator.sum
+1 - _counts.iterator.map(x = (x._2 * x._2)).sum / (total * total)
+  }
+
+  //-- Setter methods --//
+  private [statistics] def setContinuousParameters(
--- End diff --

Also `private[statistics]` would be better in here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37137821
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/FieldStats.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import org.apache.flink.ml.statistics.FieldType._
+
+import scala.collection.mutable
+
+/** Class to represent Field statistics.
+  *
+  * =Parameters=
+  * -[[fieldType]]:
+  *   Type of this field, [[DISCRETE]] or [[CONTINUOUS]]
+  *
+  * For [[DISCRETE]] fields, [[entropy]], [[gini]] and [[categoryCounts]] 
are provided.
+  * For [[CONTINUOUS]] fields, [[min]], [[max]], [[mean]] and [[variance]] 
are provided.
+  *
+  */
+class FieldStats(val fieldType: FieldType) extends Serializable {
+  // field parameters
+  private [statistics] var _min: Double = _
+  private [statistics] var _max: Double = _
+  private [statistics] var _mean: Double = _
+  private [statistics] var _variance: Double = _
+  private [statistics] var _counts: mutable.HashMap[Double,Int] = _
+
+  // Access methods //
+  def min: Double = {
+exception(DISCRETE)
+_min
+  }
+
+  def max: Double = {
+exception(DISCRETE)
+_max
+  }
+
+  def mean: Double = {
+exception(DISCRETE)
+_mean
+  }
+
+  def variance: Double = {
+exception(DISCRETE)
+_variance
+  }
+
+  def categoryCounts: mutable.HashMap[Double,Int] = {
+exception(CONTINUOUS)
+_counts
+  }
+
+  /**
+   * Returns the entropy value for this [[DISCRETE]] field.
+   */
+  def entropy: Double = {
+exception(CONTINUOUS)
+val total: Double = _counts.valuesIterator.sum
+3.322 * _counts.iterator.map(x = - (x._2 / total) * Math.log10(x._2 / 
total)).sum
+  }
+
+  /**
+   * Returns the Gini impurity for this [[DISCRETE]] field.
+   */
+  def gini: Double = {
+exception(CONTINUOUS)
+val total: Double = _counts.valuesIterator.sum
+1 - _counts.iterator.map(x = (x._2 * x._2)).sum / (total * total)
+  }
+
+  //-- Setter methods --//
+  private [statistics] def setContinuousParameters(
+  min: Double,
+  max: Double,
+  mean: Double,
+  variance: Double)
+: Unit = {
+exception(DISCRETE)
+_min = min
+_max = max
+_mean = mean
+_variance = variance
+  }
+
+  private [statistics] def setDiscreteParameters(counts: 
mutable.HashMap[Double,Int]): Unit = {
--- End diff --

Also `private[statistics]` would be better in here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37137856
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/ContinuousHistogram.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import scala.Double.MaxValue
+import scala.collection.mutable
+
+/** Implementation of a continuous valued online histogram
+  * Adapted from Ben-Haim and Yom-Tov's Streaming Decision Tree Algorithm
+  * Refer http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf
+  *
+  * =Parameters=
+  * -[[capacity]]:
+  *   Number of bins to be used in the histogram
+  *
+  * -[[min]]:
+  *   Lower limit on the elements
+  *
+  * -[[max]]:
+  *   Upper limit on the elements
+  */
+case class ContinuousHistogram(capacity: Int, min: Double, max: Double) 
extends OnlineHistogram {
+
+  require(capacity  0, Capacity should be a positive integer)
+  require(lower  upper, Lower must be less than upper)
+
+  val data = new mutable.ArrayBuffer[(Double, Int)]()
+
+  /** Adds a new item to the histogram
+*
+* @param p value to be added
+*/
+  override def add(p: Double): Unit = {
+require(p  lower  p  upper, p +  not in ( + lower + , + upper 
+ ))
+// search for the index where the value is just higher than p
+val search = find(p)
+// add the new value there, shifting everything to the right
+data.insert(search, (p, 1))
+// If we're over capacity or any two elements are within 1e-9 of each 
other, merge.
+// This will take care of the case if p was actually equal to some 
value in the histogram and
+// just increment the value there
+mergeElements()
+  }
+
+  /** Merges the histogram with h and returns a histogram with capacity B
+*
+* @param h histogram to be merged
+* @param B capacity of the resultant histogram
+* @return Merged histogram with capacity B
+*/
+  override def merge(h: OnlineHistogram, B: Int): ContinuousHistogram = {
+h match {
+  case temp: ContinuousHistogram = {
+val m: Int = bins
+val n: Int = temp.bins
+var i, j: Int = 0
+val mergeList = new mutable.ArrayBuffer[(Double, Int)]()
+while (i  m || j  n) {
+  if (i = m) {
+mergeList += ((temp.getValue(j), temp.getCounter(j)))
+j = j + 1
+  } else if (j = n || getValue(i) = temp.getValue(j)) {
+mergeList += (data.apply(i))
+i = i + 1
+  } else {
+mergeList += ((temp.getValue(j), temp.getCounter(j)))
+j = j + 1
+  }
+}
+// the size will be brought to capacity while constructing the new 
histogram itself
+val finalLower = Math.min(lower, temp.lower)
+val finalUpper = Math.max(upper, temp.upper)
+val ret = new ContinuousHistogram(B, finalLower, finalUpper)
+ret.loadData(mergeList.toArray)
+ret
+  }
+  case default =
+throw new RuntimeException(Only a continuous histogram is allowed 
to be merged with a  +
+  continuous histogram)
+
+}
+  }
+
+  /** Returns the qth quantile of the histogram
+*
+* @param q Quantile value in (0,1)
+* @return Value at quantile q
+*/
+  def quantile(q: Double): Double = {
+require(bins  0, Histogram is empty)
+require(q  0  q  1, Quantile must be between 0 and 1)
+var total = 0
+for (i - 0 to bins - 1) {
+  total = total + getCounter(i)
+}
+val wantedSum = (q * total).round.toInt
+var currSum = count(getValue(0))
+
+if (wantedSum  currSum) {
+  require(lower  -MaxValue, Set a lower bound before 

[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37137866
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/FieldStats.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import org.apache.flink.ml.statistics.FieldType._
+
+import scala.collection.mutable
+
+/** Class to represent Field statistics.
+  *
+  * =Parameters=
+  * -[[fieldType]]:
+  *   Type of this field, [[DISCRETE]] or [[CONTINUOUS]]
+  *
+  * For [[DISCRETE]] fields, [[entropy]], [[gini]] and [[categoryCounts]] 
are provided.
+  * For [[CONTINUOUS]] fields, [[min]], [[max]], [[mean]] and [[variance]] 
are provided.
+  *
+  */
+class FieldStats(val fieldType: FieldType) extends Serializable {
+  // field parameters
+  private [statistics] var _min: Double = _
+  private [statistics] var _max: Double = _
+  private [statistics] var _mean: Double = _
+  private [statistics] var _variance: Double = _
+  private [statistics] var _counts: mutable.HashMap[Double,Int] = _
+
+  // Access methods //
+  def min: Double = {
+exception(DISCRETE)
+_min
+  }
+
+  def max: Double = {
+exception(DISCRETE)
+_max
+  }
+
+  def mean: Double = {
+exception(DISCRETE)
+_mean
+  }
+
+  def variance: Double = {
+exception(DISCRETE)
+_variance
+  }
+
+  def categoryCounts: mutable.HashMap[Double,Int] = {
+exception(CONTINUOUS)
+_counts
+  }
+
+  /**
+   * Returns the entropy value for this [[DISCRETE]] field.
+   */
+  def entropy: Double = {
+exception(CONTINUOUS)
+val total: Double = _counts.valuesIterator.sum
+3.322 * _counts.iterator.map(x = - (x._2 / total) * Math.log10(x._2 / 
total)).sum
+  }
+
+  /**
+   * Returns the Gini impurity for this [[DISCRETE]] field.
+   */
+  def gini: Double = {
+exception(CONTINUOUS)
+val total: Double = _counts.valuesIterator.sum
+1 - _counts.iterator.map(x = (x._2 * x._2)).sum / (total * total)
--- End diff --

Unnecessary parentheses in `(x._2 * x._2)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37137965
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/FieldStats.scala
 ---
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import org.apache.flink.ml.statistics.FieldType._
+
+import scala.collection.mutable
+
+/** Class to represent Field statistics.
+  *
+  * =Parameters=
+  * -[[fieldType]]:
+  *   Type of this field, [[DISCRETE]] or [[CONTINUOUS]]
+  *
+  * For [[DISCRETE]] fields, [[entropy]], [[gini]] and [[categoryCounts]] 
are provided.
+  * For [[CONTINUOUS]] fields, [[min]], [[max]], [[mean]] and [[variance]] 
are provided.
+  *
+  */
+class FieldStats(val fieldType: FieldType) extends Serializable {
+  // field parameters
+  private [statistics] var _min: Double = _
+  private [statistics] var _max: Double = _
+  private [statistics] var _mean: Double = _
+  private [statistics] var _variance: Double = _
+  private [statistics] var _counts: mutable.HashMap[Double,Int] = _
+
+  // Access methods //
+  def min: Double = {
+exception(DISCRETE)
+_min
+  }
+
+  def max: Double = {
+exception(DISCRETE)
+_max
+  }
+
+  def mean: Double = {
+exception(DISCRETE)
+_mean
+  }
+
+  def variance: Double = {
+exception(DISCRETE)
+_variance
+  }
+
+  def categoryCounts: mutable.HashMap[Double,Int] = {
+exception(CONTINUOUS)
+_counts
+  }
+
+  /**
+   * Returns the entropy value for this [[DISCRETE]] field.
+   */
+  def entropy: Double = {
+exception(CONTINUOUS)
+val total: Double = _counts.valuesIterator.sum
+3.322 * _counts.iterator.map(x = - (x._2 / total) * Math.log10(x._2 / 
total)).sum
--- End diff --

Is 3.322 used for changing base of log from 10 to 2? Why don't you use 
`Math.log(~~~) / Math.log(2)`? Saving `Math.log(2)` into an immutable class 
member is good for performance.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37138026
  
--- Diff: 
flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/statistics/ContinuousHistogram.scala
 ---
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.statistics
+
+import scala.Double.MaxValue
+import scala.collection.mutable
+
+/** Implementation of a continuous valued online histogram
+  * Adapted from Ben-Haim and Yom-Tov's Streaming Decision Tree Algorithm
+  * Refer http://www.jmlr.org/papers/volume11/ben-haim10a/ben-haim10a.pdf
+  *
+  * =Parameters=
+  * -[[capacity]]:
+  *   Number of bins to be used in the histogram
+  *
+  * -[[min]]:
+  *   Lower limit on the elements
+  *
+  * -[[max]]:
+  *   Upper limit on the elements
+  */
+case class ContinuousHistogram(capacity: Int, min: Double, max: Double) 
extends OnlineHistogram {
+
+  require(capacity  0, Capacity should be a positive integer)
+  require(lower  upper, Lower must be less than upper)
+
+  val data = new mutable.ArrayBuffer[(Double, Int)]()
+
+  /** Adds a new item to the histogram
+*
+* @param p value to be added
+*/
+  override def add(p: Double): Unit = {
+require(p  lower  p  upper, p +  not in ( + lower + , + upper 
+ ))
+// search for the index where the value is just higher than p
+val search = find(p)
+// add the new value there, shifting everything to the right
+data.insert(search, (p, 1))
+// If we're over capacity or any two elements are within 1e-9 of each 
other, merge.
+// This will take care of the case if p was actually equal to some 
value in the histogram and
+// just increment the value there
+mergeElements()
+  }
+
+  /** Merges the histogram with h and returns a histogram with capacity B
+*
+* @param h histogram to be merged
+* @param B capacity of the resultant histogram
+* @return Merged histogram with capacity B
+*/
+  override def merge(h: OnlineHistogram, B: Int): ContinuousHistogram = {
+h match {
+  case temp: ContinuousHistogram = {
+val m: Int = bins
+val n: Int = temp.bins
+var i, j: Int = 0
+val mergeList = new mutable.ArrayBuffer[(Double, Int)]()
+while (i  m || j  n) {
+  if (i = m) {
+mergeList += ((temp.getValue(j), temp.getCounter(j)))
+j = j + 1
+  } else if (j = n || getValue(i) = temp.getValue(j)) {
+mergeList += (data.apply(i))
--- End diff --

Unnecessary parenthesis


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Resolved] (FLINK-2512) Add client.close() before throw RuntimeException

2015-08-15 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-2512.
-
   Resolution: Fixed
Fix Version/s: 0.10

Fixed via c2b1eb7961ed16b8985d7f8c02e87c0964b818e8

 Add client.close() before throw RuntimeException
 

 Key: FLINK-2512
 URL: https://issues.apache.org/jira/browse/FLINK-2512
 Project: Flink
  Issue Type: Bug
  Components: flink-contrib
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor
 Fix For: 0.10






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2512) Add client.close() before throw RuntimeException

2015-08-15 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2512?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-2512.
---

Thank you for the contribution

 Add client.close() before throw RuntimeException
 

 Key: FLINK-2512
 URL: https://issues.apache.org/jira/browse/FLINK-2512
 Project: Flink
  Issue Type: Bug
  Components: flink-contrib
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor
 Fix For: 0.10






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2512]Add client.close() before throw Ru...

2015-08-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/1009


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37138063
  
--- Diff: docs/libs/ml/statistics.md ---
@@ -0,0 +1,108 @@
+---
+mathjax: include
+htmlTitle: FlinkML - Statistics
+title: a href=../mlFlinkML/a - Statistics
+---
+!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+License); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+
+ The statistics utility provides features such as building histograms over 
data, determining
+ mean, variance, gini impurity, entropy etc. of data.
+
+## Methods
+
+ The Statistics utility provides two major functions: `createHistogram` 
and `dataStats`.
+
+### Creating a histogram
+
+ There are two types of histograms:
+ ul
+  li
+   strongContinuous Histograms/strong: These histograms are formed on 
a data set `X: DataSet[Double]` 
+   when the values in `X` are from a continuous range. These histograms 
support
+   `quantile` and `sum`  operations. Here `quantile(q)` refers to a value 
$x_q$ such that $|x: x
+   \leq x_q| = q * |X|$. Further, `sum(s)` refers to the number of 
elements $x \leq s$, which can
+be construed as a cumulative probability value at $s$[Of course, 
iscaled/i probability].
+   br
+A continuous histogram can be formed by calling `X.createHistogram(b)` 
where `b` is the
+number of bins.
+  /li
+  li
+strongCategorical Histograms/strong: These histograms are formed 
on a data set `X:DataSet[Double]` 
+when the values in `X` are from a discrete distribution. These 
histograms
+support `count(c)` operation which returns the number of elements 
associated with cateogry `c`.
+br
+A categorical histogram can be formed by calling 
`X.createHistogram(0)`.
+  /li
+ /ul
+
+### Data Statistics
+
+ The `dataStats` function operates on a data set `X: DataSet[Vector]` and 
returns column-wise
+ statistics for `X`. Every field of `X` is allowed to be defined as either 
idiscrete/i or
+ icontinuous/i.
+ br
+ Statistics can be evaluated by calling `DataStats.dataStats(X)` or 
+ `DataStats.dataStats(X, discreteFields`). The latter is used when some 
fields are needed to be 
+ declared discrete-valued, and is provided as an array of indices of 
fields which are discrete.
+ br
+ The following information is available as part of `DataStats`:
+ ul
+liNumber of elements in `X`/li
+liDimension of `X`/li
+liColumn-wise statistics where for discrete fields, we report counts 
for each category, and
+ the Gini impurity and Entropy of the field, while for continuous 
fields, we report the
+ minimum, maximum, mean and variance.
+/li
+ /ul
+
+## Examples
+
+{% highlight scala %}
+
+import org.apache.flink.ml.statistics._
+import org.apache.flink.ml._
+
+val X: DataSet[Double] = ...
+// Create continuous histogram
+val histogram = X.createHistogram(5) // creates a histogram with five 
bins
+histogram.quantile(0.3)  // returns the 30th quantile
+histogram.sum(4) // returns number of elements 
less than 4
+
+// Create categorical histogram
+val histogram = X.createHistogram(0) // creates a categorical histogram
+histogram.count(3)   // number of elements with 
cateogory value 3
--- End diff --

cateogory - category


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on a diff in the pull request:

https://github.com/apache/flink/pull/861#discussion_r37138093
  
--- Diff: docs/libs/ml/statistics.md ---
@@ -0,0 +1,108 @@
+---
+mathjax: include
+htmlTitle: FlinkML - Statistics
+title: a href=../mlFlinkML/a - Statistics
+---
+!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+License); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+AS IS BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+--
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+
+ The statistics utility provides features such as building histograms over 
data, determining
+ mean, variance, gini impurity, entropy etc. of data.
+
+## Methods
+
+ The Statistics utility provides two major functions: `createHistogram` 
and `dataStats`.
+
+### Creating a histogram
+
+ There are two types of histograms:
+ ul
+  li
+   strongContinuous Histograms/strong: These histograms are formed on 
a data set `X: DataSet[Double]` 
+   when the values in `X` are from a continuous range. These histograms 
support
+   `quantile` and `sum`  operations. Here `quantile(q)` refers to a value 
$x_q$ such that $|x: x
+   \leq x_q| = q * |X|$. Further, `sum(s)` refers to the number of 
elements $x \leq s$, which can
+be construed as a cumulative probability value at $s$[Of course, 
iscaled/i probability].
+   br
+A continuous histogram can be formed by calling `X.createHistogram(b)` 
where `b` is the
+number of bins.
+  /li
+  li
+strongCategorical Histograms/strong: These histograms are formed 
on a data set `X:DataSet[Double]` 
+when the values in `X` are from a discrete distribution. These 
histograms
+support `count(c)` operation which returns the number of elements 
associated with cateogry `c`.
+br
+A categorical histogram can be formed by calling 
`X.createHistogram(0)`.
+  /li
+ /ul
+
+### Data Statistics
+
+ The `dataStats` function operates on a data set `X: DataSet[Vector]` and 
returns column-wise
+ statistics for `X`. Every field of `X` is allowed to be defined as either 
idiscrete/i or
+ icontinuous/i.
+ br
+ Statistics can be evaluated by calling `DataStats.dataStats(X)` or 
+ `DataStats.dataStats(X, discreteFields`). The latter is used when some 
fields are needed to be 
+ declared discrete-valued, and is provided as an array of indices of 
fields which are discrete.
+ br
+ The following information is available as part of `DataStats`:
+ ul
+liNumber of elements in `X`/li
+liDimension of `X`/li
+liColumn-wise statistics where for discrete fields, we report counts 
for each category, and
+ the Gini impurity and Entropy of the field, while for continuous 
fields, we report the
+ minimum, maximum, mean and variance.
+/li
+ /ul
+
+## Examples
+
+{% highlight scala %}
+
+import org.apache.flink.ml.statistics._
+import org.apache.flink.ml._
+
+val X: DataSet[Double] = ...
+// Create continuous histogram
+val histogram = X.createHistogram(5) // creates a histogram with five 
bins
+histogram.quantile(0.3)  // returns the 30th quantile
+histogram.sum(4) // returns number of elements 
less than 4
--- End diff --

Is this example valid? `RichDoubleDataSet.createHistogram` returns 
`DataSet[OnlineHistogram]` and it doesn't have methods such as `quantile`, 
`sum`, ..., etc..


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set

2015-08-15 Thread Vasia Kalavri (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698399#comment-14698399
 ] 

Vasia Kalavri commented on FLINK-2527:
--

I think (3) would break the model semantics. I'm leaning towards (1). [~ggevay] 
do you have any case in mind that (2) would allow to implement but (1) wouldn't?

 If a VertexUpdateFunction calls setNewVertexValue more than once, the 
 MessagingFunction will only see the first value set
 -

 Key: FLINK-2527
 URL: https://issues.apache.org/jira/browse/FLINK-2527
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Reporter: Gabor Gevay
Assignee: Gabor Gevay
 Fix For: 0.10, 0.9.1


 The problem is that if setNewVertexValue is called more than once, it sends 
 each new value to the out Collector, and these all end up in the workset, but 
 then the coGroups in the two descendants of MessagingUdfWithEdgeValues use 
 only the first value in the state Iterable. I see three ways to resolve this:
 1. Add it to the documentation that setNewVertexValue should only be called 
 once, and optionally add a check for this.
 2. In setNewVertexValue, do not send the newValue to the out Collector at 
 once, but only record it in outVal, and send the last recorded value after 
 updateVertex returns.
 3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup 
 and MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need 
 some documentation addition.)
 I like 2. the best. What are your opinions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [Flink-2030][ml]Data Set Statistics and Histog...

2015-08-15 Thread chiwanpark
Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/861#issuecomment-131425060
  
Hi @sachingoel0101, Thanks for your contribution. I reviewed this PR and 
commented the source code.

There are some problems which aren't commented. In documentation, there are 
many lines including `a`, `ul`, `li`, 'br', 'strong' and 'i' tag. 
These can be replaced with markdown syntax. And some codes 
(`MLUtils.createHistogram`, `ColumnStatistics`) are not formatted.

I didn't finish reviewing process of building histogram. After reading full 
source code and the given paper, I'll add comment about the process.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-2526]Add try-catch for task when it sto...

2015-08-15 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1020#issuecomment-131390701
  
The pull request #1017 actually fixes this as well, together with a lot of 
other things (similar problems are in many places).

Can you have a look at #1017 and see if that would solve your concern?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2526) Add catch{} for task when it stop running

2015-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2526?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698306#comment-14698306
 ] 

ASF GitHub Bot commented on FLINK-2526:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1020#issuecomment-131390701
  
The pull request #1017 actually fixes this as well, together with a lot of 
other things (similar problems are in many places).

Can you have a look at #1017 and see if that would solve your concern?


 Add catch{} for task when it stop running 
 --

 Key: FLINK-2526
 URL: https://issues.apache.org/jira/browse/FLINK-2526
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2486) Remove unwanted null check in removeInstance function

2015-08-15 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-2486.
-
Resolution: Won't Fix

Valid sanity check, won't fix.

 Remove unwanted null check in removeInstance function
 -

 Key: FLINK-2486
 URL: https://issues.apache.org/jira/browse/FLINK-2486
 Project: Flink
  Issue Type: Bug
  Components: Scheduler
Affects Versions: 0.8.1
Reporter: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2486) Remove unwanted null check in removeInstance function

2015-08-15 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2486?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-2486.
---

 Remove unwanted null check in removeInstance function
 -

 Key: FLINK-2486
 URL: https://issues.apache.org/jira/browse/FLINK-2486
 Project: Flink
  Issue Type: Bug
  Components: Scheduler
Affects Versions: 0.8.1
Reporter: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Resolved] (FLINK-2516) Remove unwanted log.isInfoEnabled check

2015-08-15 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-2516.
-
Resolution: Won't Fix

As per discussion in the issue, this will not be changed.

 Remove unwanted log.isInfoEnabled check
 ---

 Key: FLINK-2516
 URL: https://issues.apache.org/jira/browse/FLINK-2516
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Closed] (FLINK-2516) Remove unwanted log.isInfoEnabled check

2015-08-15 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-2516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-2516.
---

 Remove unwanted log.isInfoEnabled check
 ---

 Key: FLINK-2516
 URL: https://issues.apache.org/jira/browse/FLINK-2516
 Project: Flink
  Issue Type: Bug
  Components: Distributed Runtime
Affects Versions: 0.8.1
Reporter: fangfengbin
Assignee: fangfengbin
Priority: Minor





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-2462) Wrong exception reporting in streaming jobs

2015-08-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698413#comment-14698413
 ] 

ASF GitHub Bot commented on FLINK-2462:
---

Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1017#issuecomment-131431977
  
Fixed the issues with the tests. Builds locally, waiting for Travis to 
confirm.


 Wrong exception reporting in streaming jobs
 ---

 Key: FLINK-2462
 URL: https://issues.apache.org/jira/browse/FLINK-2462
 Project: Flink
  Issue Type: Bug
  Components: Streaming
Affects Versions: 0.10
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Priority: Blocker
 Fix For: 0.10


 When streaming tasks are fail and are canceled, they report a plethora of 
 followup exceptions.
 The batch operators have a clear model that makes sure that root causes are 
 reported, and followup exceptions are not reported. That makes debugging much 
 easier.
 A big part of that is to have a single consistent place that logs exceptions, 
 and that has a view of whether the operation is still running, or whether it 
 has been canceled.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[GitHub] flink pull request: [FLINK-2462] [streaming] Major cleanup of stre...

2015-08-15 Thread StephanEwen
Github user StephanEwen commented on the pull request:

https://github.com/apache/flink/pull/1017#issuecomment-131431977
  
Fixed the issues with the tests. Builds locally, waiting for Travis to 
confirm.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[jira] [Commented] (FLINK-2527) If a VertexUpdateFunction calls setNewVertexValue more than once, the MessagingFunction will only see the first value set

2015-08-15 Thread Gabor Gevay (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-2527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14698423#comment-14698423
 ] 

Gabor Gevay commented on FLINK-2527:


I'm also leaning towards (1) now. I have actually implemented (2) in the 
meantime, but then I realized that it sets a subtle trap for the user, that I 
immediately fell into :)

In my user function, I have a loop over the msgs, and for each msg I decide to 
set some new vertex value or not. This loop might set a new value multiple 
times, and the last one should be retained.

At first, I liked (2) better, because if we have (1), then I essentially have 
to implement (2) inside my user function anyway. I thought that this situation 
is probably a common one, so why have everyone reimplement (2) inside their 
user functions, if we can do it in Gelly? However, the trap is that my code 
inside the loop implicitly assumed that the setNewVertexValue function updates 
the vertex variable (the first parameter of the UDF), but it does not. Of 
course, we could make the setNewVertexValue do this update, but this is getting 
complicated. So it is probably just best to go with (1), to keep the API nice 
and simple.

 If a VertexUpdateFunction calls setNewVertexValue more than once, the 
 MessagingFunction will only see the first value set
 -

 Key: FLINK-2527
 URL: https://issues.apache.org/jira/browse/FLINK-2527
 Project: Flink
  Issue Type: Bug
  Components: Gelly
Reporter: Gabor Gevay
Assignee: Gabor Gevay
 Fix For: 0.10, 0.9.1


 The problem is that if setNewVertexValue is called more than once, it sends 
 each new value to the out Collector, and these all end up in the workset, but 
 then the coGroups in the two descendants of MessagingUdfWithEdgeValues use 
 only the first value in the state Iterable. I see three ways to resolve this:
 1. Add it to the documentation that setNewVertexValue should only be called 
 once, and optionally add a check for this.
 2. In setNewVertexValue, do not send the newValue to the out Collector at 
 once, but only record it in outVal, and send the last recorded value after 
 updateVertex returns.
 3. Iterate over the entire Iterable in MessagingUdfWithEVsSimpleVV.coGroup 
 and MessagingUdfWithEVsVVWithDegrees.coGroup. (This would probably still need 
 some documentation addition.)
 I like 2. the best. What are your opinions?



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)