[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708886#comment-14708886
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-134070592
  
Hey @ChengXiangLi, I have another concern, regarding the seed for sampling. 
It doesn't seem to serve its purpose. I tried sampling with fraction three 
times with the same seed, however, every time I get different results. 
I've been stuck on this problem myself for quite some time. The fix is, 
instead of `seed + index`, you should just use `seed`. But then, the sample is 
not truly random. Far as I could figure out, splits of data don't arrive at the 
exactly same index subtask every time.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708918#comment-14708918
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-134081197
  
Yes. I was only wondering if we should at least ensure this when it is done 
right at the source though.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708921#comment-14708921
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-134082960
  
IMHO, this makes understanding the semantics of the sampling operator only 
more complicated because it behaves differently depending on the job graph 
structure. I would rather document this limitation more prominently.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708867#comment-14708867
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-134063115
  
Thanks, @chiwanpark , waiting for your result. 
Besides, we can reduce the fail-positive case to an acceptable possibility 
by expanding the verification boundary, while at the cost of making 
verification weak. 


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=1470#comment-1470
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-134070893
  
@thvasilo, the PR did not yet include the proper sampling behaviour within 
iterations. See FLINK-2396. 


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708889#comment-14708889
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-134071161
  
@sachingoel0101, you're right. The problem is that Flink does not give you 
a guarantee in which order the elements will arrive. But this problem won't be 
fixed by setting the seed for all sampling operators to the same value. There 
always might be an operator, e.g. rebalance, which will completely randomize 
your element order.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708658#comment-14708658
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-133990256
  
Hi, @sachingoel0101 , while sample with fraction, it's not easy to verify 
whether the DataSet is sampled with input fraction. In the test, i take 5 times 
sample, use the average size to computer the result fraction, and then compare 
the result fraction with input fraction, verify their difference is not more 
than 10% percent. The following case may happens as well, Sampler sample the 
DataSet with input fraction, but the sampled result size is too small or too 
large that beyond our verification condition, it happens, just with very little 
possibility, say less than 0.001 in this test. it should be ok if this failure 
happens very occasionally, please let me know if you found it's not.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708701#comment-14708701
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user chiwanpark commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-133999705
  
@ChengXiangLi I know that it is hard to verify random sampler 
implementation. But we need to fix this test failing because of difficulty of 
other pull requests verification. Some tests of other pull requests are failed 
by K-S test and sampling test with fraction. There is a [JIRA 
issue](https://issues.apache.org/jira/browse/FLINK-2564) covered this.

I'm testing with increased count of samples and source size. If I get a 
notable result, I'll post the result.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-22 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14707972#comment-14707972
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-133675023
  
It's great to have this in, I'll try to update the cross-validation and SGD 
to use this.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706538#comment-14706538
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-133371873
  
Looks great. Thanks a lot for your contribution @ChengXiangLi. Will merge 
it now.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706741#comment-14706741
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/949


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706861#comment-14706861
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-133461987
  
Hey @ChengXiangLi , I just observed a failure on a test case: 
https://travis-ci.org/sachingoel0101/flink/jobs/76649177
Here is the relevant statement:
RandomSamplerTest.testPoissonSamplerFraction:116-verifySamplerFraction:249 
expected fraction: 0.01, result fraction: 0.009000


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14704535#comment-14704535
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-132943584
  
Oh cool, then you were faster than me :-)


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14704676#comment-14704676
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-132972593
  
Move sample/sampleWithSize operator to DataSetUtil and update unit test.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14704508#comment-14704508
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-132935282
  
@ChengXiangLi, you're right, I should have noticed earlier and raise a 
flag. But your work is not in vain. I think it's some excellent piece of work 
and the `sample` method could also become part of the core API right away.

For the sake of completeness, let's do it once the `sampleWithSize` method 
works also robustly. I think your proposition for the next steps is a good way 
to continue with it. Once you've moved the `sample` and `sampleWithSize` 
methods to the `DataSetUtils` class, we close and merge this PR. In the 
meantime, I'll create a JIRA for the topK operator, where we can discuss the 
matter further.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14704530#comment-14704530
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-132943122
  
OK, @tillrohrmann , great to hear that. Besides, I've created 
[FLINK-2549](https://issues.apache.org/jira/browse/FLINK-2549) for topK 
operator.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14703211#comment-14703211
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r37429899
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/PoissonSampler.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.math3.distribution.PoissonDistribution;
+
+import java.util.Iterator;
+
+/**
+ * A sampler implementation based on Poisson Distribution. While sampling 
elements with fraction and replacement,
+ * the selected number of each element follows a given poisson 
distribution, so we could use poisson
+ * distribution to generate random variables for sample.
+ *
+ * @param T The type of sample.
+ * @see a 
href=https://en.wikipedia.org/wiki/Poisson_distribution;https://en.wikipedia.org/wiki/Poisson_distribution/a
+ */
+public class PoissonSamplerT extends RandomSamplerT {
+   
+   private PoissonDistribution poissonDistribution;
+   private final double fraction;
+   
+   /**
+* Create a poisson sampler which would sample elements with 
replacement.
+*
+* @param fraction The expected count of each element.
+* @param seed Random number generator seed for internal 
PoissonDistribution.
+*/
+   public PoissonSampler(double fraction, long seed) {
+   Preconditions.checkArgument(fraction = 0, fraction should be 
positive.);
+   this.fraction = fraction;
+   if (this.fraction  0) {
+   this.poissonDistribution = new 
PoissonDistribution(fraction);
+   this.poissonDistribution.reseedRandomGenerator(seed);
+   }
+   }
+   
+   /**
+* Create a poisson sampler which would sample elements with 
replacement.
+*
+* @param fraction The expected count of each element.
+*/
+   public PoissonSampler(double fraction) {
+   Preconditions.checkArgument(fraction = 0, fraction should be 
non-negative.);
+   this.fraction = fraction;
+   if (this.fraction  0) {
+   this.poissonDistribution = new 
PoissonDistribution(fraction);
+   }
+   }
+   
+   /**
+* Sample the input elements, for each input element, generate its 
count with poisson distribution random variables generation.
+*
+* @param input Elements to be sampled.
+* @return The sampled result which is lazy computed upon input 
elements.
+*/
+   @Override
+   public IteratorT sample(final IteratorT input) {
+   if (fraction == 0) {
+   return EMPTY_ITERABLE;
+   }
+   
+   return new SampledIteratorT() {
+   T currentElement;
+   int currentCount = 0;
+   
+   @Override
+   public boolean hasNext() {
+   if (currentElement == null || currentCount == 
0) {
+   while (input.hasNext()) {
+   currentElement = input.next();
+   currentCount = 
poissonDistribution.sample();
+   if (currentCount  0) {
+   return true;
+   }
+   }
+   return false;
+   }
+   return true;
+   }
+   

[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14703347#comment-14703347
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-132692640
  
Sorry for the late review @ChengXiangLi. I finished it now and the PR looks 
really good. 

There is only one thing I would like to change before merging it. This is 
to move the `sample` and the `sampleWithSize` methods from the `DataSet` to the 
`DataSetUtils` class. This will effectively make the `sample` methods not part 
of the core API. The reason for this is that the `sampleWithSize` method breaks 
with one of Flink's guarantees, which is the robustness of the core API 
functions against `OutOfMemoryExceptions`. Let me elaborate on it for better 
understanding.

All of Flink internal operations work on serialized data which is stored in 
Flink's *managed* memory. The managed memory allows Flink to detect when to 
spill elements from memory to disk to avoid out of memory exceptions and, thus, 
to make the system robust. The managed memory is a pre-allocated area of memory 
which is administered by the `MemoryManager`. The `MemoryManager` allows you to 
allocate and deallocate `MemorySegments` in a c-style fashion. However, once a 
data item enters a UDF, the item has to be deserialized putting it on the 
remaining heap. This is not bad if your UDF does not accumulate these elements. 
However, the `sampleWithSize` method materializes up to `numSamples` elements 
on the heap. Depending on the number of samples and the data item size, this 
might be enough to eat up all remaining heap memory space and to crash the JVM.

I think that your current implementation will work for most use cases but 
in order to make it part of the core API, we also have to deal with the case 
where our sample cannot be materialized on the remaining heap of a running 
Flink program. In order to achieve this, I think it would be necessary to 
implement a native `topK` operator. With *native* I mean an operator which 
works on Flink's managed memory and, thus, can also deal with spilling records 
to disk. Having such a `topK` operator, we could reimplement the reservoir 
sampling algorithm the following way: For sampling without replacement we first 
assign weights in a map operation to each element. Then we call topK with 
respect to the weights and obtain the sample. For the sampling with replacement 
we could simply use a flat map operation to assign `numSamples` times a weight 
to each element. Then we again call `topK` with respect to the weight.

For the topK implementation, we would need something like a `PriorityQueue` 
which operates on managed memory (similar to the `CompactingHashTable` which is 
a hash table working on managed memory). Thus, we would have a priority queue 
which stores the priority values of each record and a pointer to the record 
which is kept in managed memory. Whenever an element is removed from the 
priority queue, we can also free the occupied managed memory. In case that we 
run out of managed memory, we have to spill some of the records to disk which 
are still in the race for the top k. As a first step, we can skip the spilling 
and just throw a proper exception (other than `OutOfMemoryException`) when we 
run out of memory. Afterwards, we can incrementally add the spilling 
functionality.

I know that you've already spent a lot of effort into writing the sampling 
operator and that this result might be a little bit demotivating. However, if 
we want to make it right and robust, then I think this is the way to go. 
Additionally we would add a proper topK operator to Flink's API which is 
missing big time :-) If you want to, then you could also take the lead here. 
The further discussion should then happen in a separate issue. I'm more than 
willing to assist you in implementing this operator. What do you think?


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-19 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14704214#comment-14704214
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-132864761
  
Thanks for the detail explanation, @tillrohrmann . As the owner of this 
issue, make it work correctly and  efficiently is my top desire, so i would 
never say no to a reasonable and proper suggestion(although i did expect it 
happens earlier :-)). `OutOfMemoryError` is one of my concern during the 
implementation as well, unlimited usage of Java heap is unacceptable, so i 
limit the reservoir size(`PriorityQueue` size in implementation) same as 
`numSamples` parameter, which give the user full control of the memory used in 
java heap for `sample` operator. It may lead to `OutOfMemoryError` if the 
`numSamples` is too large, but other operators may lead to `OutOfMemoryError` 
as well if user cache too much objects inside UDF. Anyway, it's just my first 
thought, a fully managed memory usage is a better solution, and i would like to 
implement that.
As mentioned in the previous comment, is this the next step you prefer?
1. move the `sample` and the `sampleWithSize` methods from the `DataSet` to 
the `DataSetUtils` class, and merge this PR.
2. implement `topK` operator in a separate PR.
3. update `sampleWithSize` implementation, and move back to `DataSet`.



 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14701026#comment-14701026
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-132152536
  
Hi, @tillrohrmann , do you have sometime to continue to review this PR and 
help to push its progress?


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14702304#comment-14702304
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r37373497
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/PoissonSampler.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.math3.distribution.PoissonDistribution;
+
+import java.util.Iterator;
+
+/**
+ * A sampler implementation based on Poisson Distribution. While sampling 
elements with fraction and replacement,
+ * the selected number of each element follows a given poisson 
distribution, so we could use poisson
+ * distribution to generate random variables for sample.
+ *
+ * @param T The type of sample.
+ * @see a 
href=https://en.wikipedia.org/wiki/Poisson_distribution;https://en.wikipedia.org/wiki/Poisson_distribution/a
+ */
+public class PoissonSamplerT extends RandomSamplerT {
+   
+   private PoissonDistribution poissonDistribution;
+   private final double fraction;
+   
+   /**
+* Create a poisson sampler which would sample elements with 
replacement.
+*
+* @param fraction The expected count of each element.
+* @param seed Random number generator seed for internal 
PoissonDistribution.
+*/
+   public PoissonSampler(double fraction, long seed) {
+   Preconditions.checkArgument(fraction = 0, fraction should be 
positive.);
+   this.fraction = fraction;
+   if (this.fraction  0) {
+   this.poissonDistribution = new 
PoissonDistribution(fraction);
+   this.poissonDistribution.reseedRandomGenerator(seed);
+   }
+   }
+   
+   /**
+* Create a poisson sampler which would sample elements with 
replacement.
+*
+* @param fraction The expected count of each element.
+*/
+   public PoissonSampler(double fraction) {
+   Preconditions.checkArgument(fraction = 0, fraction should be 
non-negative.);
+   this.fraction = fraction;
+   if (this.fraction  0) {
+   this.poissonDistribution = new 
PoissonDistribution(fraction);
+   }
+   }
+   
+   /**
+* Sample the input elements, for each input element, generate its 
count with poisson distribution random variables generation.
+*
+* @param input Elements to be sampled.
+* @return The sampled result which is lazy computed upon input 
elements.
+*/
+   @Override
+   public IteratorT sample(final IteratorT input) {
+   if (fraction == 0) {
+   return EMPTY_ITERABLE;
+   }
+   
+   return new SampledIteratorT() {
+   T currentElement;
+   int currentCount = 0;
+   
+   @Override
+   public boolean hasNext() {
+   if (currentElement == null || currentCount == 
0) {
+   while (input.hasNext()) {
+   currentElement = input.next();
+   currentCount = 
poissonDistribution.sample();
+   if (currentCount  0) {
+   return true;
+   }
+   }
+   return false;
+   }
+   return true;
+   }
+   

[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-18 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14701440#comment-14701440
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r37313995
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/PoissonSampler.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.math3.distribution.PoissonDistribution;
+
+import java.util.Iterator;
+
+/**
+ * A sampler implementation based on Poisson Distribution. While sampling 
elements with fraction and replacement,
+ * the selected number of each element follows a given poisson 
distribution, so we could use poisson
+ * distribution to generate random variables for sample.
+ *
+ * @param T The type of sample.
+ * @see a 
href=https://en.wikipedia.org/wiki/Poisson_distribution;https://en.wikipedia.org/wiki/Poisson_distribution/a
+ */
+public class PoissonSamplerT extends RandomSamplerT {
+   
+   private PoissonDistribution poissonDistribution;
+   private final double fraction;
+   
+   /**
+* Create a poisson sampler which would sample elements with 
replacement.
+*
+* @param fraction The expected count of each element.
+* @param seed Random number generator seed for internal 
PoissonDistribution.
+*/
+   public PoissonSampler(double fraction, long seed) {
+   Preconditions.checkArgument(fraction = 0, fraction should be 
positive.);
+   this.fraction = fraction;
+   if (this.fraction  0) {
+   this.poissonDistribution = new 
PoissonDistribution(fraction);
+   this.poissonDistribution.reseedRandomGenerator(seed);
+   }
+   }
+   
+   /**
+* Create a poisson sampler which would sample elements with 
replacement.
+*
+* @param fraction The expected count of each element.
+*/
+   public PoissonSampler(double fraction) {
+   Preconditions.checkArgument(fraction = 0, fraction should be 
non-negative.);
+   this.fraction = fraction;
+   if (this.fraction  0) {
+   this.poissonDistribution = new 
PoissonDistribution(fraction);
+   }
+   }
+   
+   /**
+* Sample the input elements, for each input element, generate its 
count with poisson distribution random variables generation.
+*
+* @param input Elements to be sampled.
+* @return The sampled result which is lazy computed upon input 
elements.
+*/
+   @Override
+   public IteratorT sample(final IteratorT input) {
+   if (fraction == 0) {
+   return EMPTY_ITERABLE;
+   }
+   
+   return new SampledIteratorT() {
+   T currentElement;
+   int currentCount = 0;
+   
+   @Override
+   public boolean hasNext() {
+   if (currentElement == null || currentCount == 
0) {
+   while (input.hasNext()) {
+   currentElement = input.next();
+   currentCount = 
poissonDistribution.sample();
+   if (currentCount  0) {
+   return true;
+   }
+   }
+   return false;
+   }
+   return true;
+   }
+   

[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14696619#comment-14696619
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-131001106
  
Thanks for the review, @thvasilo , fixed all comments in latest commit.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14696632#comment-14696632
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r37054584
  
--- Diff: 
flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import java.util.{List = JavaList, Random}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.Assert._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Before, After, Test}
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SampleITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+  private val RNG: Random = new Random
+
+  private var result: JavaList[String] = null;
+
+  @Before
+  def initiate {
+ExecutionEnvironment.getExecutionEnvironment.setParallelism(5)
+  }
+
+  @After
+  def after() = {
+TestBaseUtils.containsResultAsText(result, getSourceStrings)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithFractionWithoutReplacement {
+verifySamplerWithFractionWithoutReplacement(0d)
+verifySamplerWithFractionWithoutReplacement(0.2d)
+verifySamplerWithFractionWithoutReplacement(1.0d)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithFractionWithReplacement {
+verifySamplerWithFractionWithReplacement(0d)
+verifySamplerWithFractionWithReplacement(0.2d)
+verifySamplerWithFractionWithReplacement(1.0d)
+verifySamplerWithFractionWithReplacement(2.0d)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithSizeWithoutReplacement {
+verifySamplerWithFixedSizeWithoutReplacement(0)
+verifySamplerWithFixedSizeWithoutReplacement(2)
+verifySamplerWithFixedSizeWithoutReplacement(21)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithSizeWithReplacement {
+verifySamplerWithFixedSizeWithReplacement(0)
+verifySamplerWithFixedSizeWithReplacement(2)
+verifySamplerWithFixedSizeWithReplacement(21)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithoutReplacement(fraction: 
Double) {
+verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithoutReplacement(fraction: 
Double, seed: Long) {
+verifySamplerWithFraction(false, fraction, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithReplacement(fraction: Double) {
+verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithReplacement(fraction: Double, 
seed: Long) {
+verifySamplerWithFraction(true, fraction, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFraction(withReplacement: Boolean, 
fraction: Double, seed: Long) {
+val ds = getSourceDataSet()
+val sampled = ds.sample(withReplacement, fraction, seed)
+result = sampled.collect.asJava
--- End diff --

Thanks, I missed that.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
  

[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14694886#comment-14694886
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36950634
  
--- Diff: 
flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala ---
@@ -1182,6 +1184,60 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) {
 getCallLocationName()))
 
   // 

+  //  Sample
+  // 

+  /**
+   * Generate a sample of DataSet by the probability fraction of each 
element.
+   *
+   * @param withReplacement Whether element can be selected more than once.
+   * @param fractionProbability that each element is chosen, 
should be [0,1] without
+   *replacement, and [0, ∞) with replacement. 
While fraction is larger
+   *than 1, the elements are expected to be 
selected multi times into
+   *sample on average.
+   * @param seedRandom number generator seed.
+   * @return The sampled DataSet
+   */
+  def sample(
+  withReplacement: Boolean,
+  fraction: Double,
+  seed: Long = Utils.RNG.nextLong()): DataSet[T] = {
+
+wrap(new MapPartitionOperator[T, T](javaSet,
+  getType(),
+  new SampleWithFraction(withReplacement, fraction, seed),
+  getCallLocationName()))
+  }
+
+  /**
+   * Generate a sample of DataSet by the probability fraction of each 
element.
--- End diff --

Javadoc is from the fraction function.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14694981#comment-14694981
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36957769
  
--- Diff: 
flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import java.util.{List = JavaList, Random}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.Assert._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Before, After, Test}
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SampleITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+  private val RNG: Random = new Random
+
+  private var result: JavaList[String] = null;
+
+  @Before
+  def initiate {
+ExecutionEnvironment.getExecutionEnvironment.setParallelism(5)
+  }
+
+  @After
+  def after() = {
+TestBaseUtils.containsResultAsText(result, getSourceStrings)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithFractionWithoutReplacement {
+verifySamplerWithFractionWithoutReplacement(0d)
+verifySamplerWithFractionWithoutReplacement(0.2d)
+verifySamplerWithFractionWithoutReplacement(1.0d)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithFractionWithReplacement {
+verifySamplerWithFractionWithReplacement(0d)
+verifySamplerWithFractionWithReplacement(0.2d)
+verifySamplerWithFractionWithReplacement(1.0d)
+verifySamplerWithFractionWithReplacement(2.0d)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithSizeWithoutReplacement {
+verifySamplerWithFixedSizeWithoutReplacement(0)
+verifySamplerWithFixedSizeWithoutReplacement(2)
+verifySamplerWithFixedSizeWithoutReplacement(21)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithSizeWithReplacement {
+verifySamplerWithFixedSizeWithReplacement(0)
+verifySamplerWithFixedSizeWithReplacement(2)
+verifySamplerWithFixedSizeWithReplacement(21)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithoutReplacement(fraction: 
Double) {
+verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithoutReplacement(fraction: 
Double, seed: Long) {
+verifySamplerWithFraction(false, fraction, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithReplacement(fraction: Double) {
+verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithReplacement(fraction: Double, 
seed: Long) {
+verifySamplerWithFraction(true, fraction, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFraction(withReplacement: Boolean, 
fraction: Double, seed: Long) {
+val ds = getSourceDataSet()
+val sampled = ds.sample(withReplacement, fraction, seed)
+result = sampled.collect.asJava
--- End diff --

Is this result checked for validity somewhere?


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: 

[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14695031#comment-14695031
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36960080
  
--- Diff: 
flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---
@@ -451,6 +454,53 @@ protected static File asFile(String path) {
assertEquals(extectedStrings[i], resultStrings[i]);
}
}
+   
+   // 

+   // Comparison methods for tests using sample
+   // 

+   
+   public static T void containsResultAsTuples(ListT result, String 
expected) {
--- End diff --

Is this used anywhere?


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14694940#comment-14694940
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36955412
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * A sampler implementation built upon Bernoulli Trail. For sample with 
fraction and without replacement,
+ * each element sample choice is just a bernoulli trail.
--- End diff --

Do you mean Bernouli _trial_ here?


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14694943#comment-14694943
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36955515
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * A sampler implementation built upon Bernoulli Trail. For sample with 
fraction and without replacement,
+ * each element sample choice is just a bernoulli trail.
+ *
+ * @param T The type of sample.
+ */
+public class BernoulliSamplerT extends RandomSamplerT {
+   
+   private final double fraction;
+   private final Random random;
+   
+   /**
+* Create a bernoulli sampler sample fraction and default random number 
generator.
--- End diff --

*B*ernouli should be capitalized for all mentions


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14695034#comment-14695034
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36960527
  
--- Diff: 
flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---
@@ -451,6 +454,53 @@ protected static File asFile(String path) {
assertEquals(extectedStrings[i], resultStrings[i]);
}
}
+   
+   // 

+   // Comparison methods for tests using sample
+   // 

+   
+   public static T void containsResultAsTuples(ListT result, String 
expected) {
+   isExpectedContainsResult(result, expected, true);
+   }
+   
+   public static T void containsResultAsText(ListT result, String 
expected) {
+   isExpectedContainsResult(result, expected, false);
+   }
+   
+   private static T void isExpectedContainsResult(ListT result, String 
expected, boolean asTuple) {
--- End diff --

Can we get comments explaining the functionality of this and 
`containsResultAsText`?


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14695003#comment-14695003
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36958345
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * A sampler implementation built upon Bernoulli Trail. For sample with 
fraction and without replacement,
+ * each element sample choice is just a bernoulli trail.
+ *
+ * @param T The type of sample.
+ */
+public class BernoulliSamplerT extends RandomSamplerT {
+   
+   private final double fraction;
+   private final Random random;
+   
+   /**
+* Create a bernoulli sampler sample fraction and default random number 
generator.
+*
+* @param fraction Sample fraction, aka the bernoulli sampler 
possibility.
+*/
+   public BernoulliSampler(double fraction) {
+   this(fraction, new Random());
+   }
+   
+   /**
+* Create a bernoulli sampler sample fraction and random number 
generator seed.
+*
+* @param fraction Sample fraction, aka the bernoulli sampler 
possibility.
+* @param seed Random number generator seed.
+*/
+   public BernoulliSampler(double fraction, long seed) {
+   this(fraction, new Random(seed));
+   }
+   
+   /**
+* Create a bernoulli sampler sample fraction and random number 
generator.
+*
+* @param fraction Sample fraction, aka the bernoulli sampler 
possibility.
+* @param random   The random number generator.
+*/
+   public BernoulliSampler(double fraction, Random random) {
+   Preconditions.checkArgument(fraction = 0  fraction = 1.0d, 
fraction fraction must between [0, 1].);
+   this.fraction = fraction;
+   this.random = random;
+   }
+   
+   /**
+* Sample the input elements, for each input element, take a Bernoulli 
Trail for sample.
+*
+* @param input Elements to be sampled.
+* @return The sampled result which is lazy computed upon input 
elements.
+*/
+   @Override
+   public IteratorT sample(final IteratorT input) {
+   if (fraction == 0) {
+   return EMPTY_ITERABLE;
+   }
+   
+   return new SampledIteratorT() {
+   T current;
+   
+   @Override
+   public boolean hasNext() {
+   if (current == null) {
+   while (input.hasNext()) {
+   T element = input.next();
+   if (random.nextDouble() = 
fraction) {
+   current = element;
+   return true;
+   }
+   }
+   current = null;
+   return false;
+   } else {
+   return true;
+   }
+   }
+   
+   @Override
+   public T next() {
--- End diff --

It feels a bit counterintuitive that the next element is prepared in the 
`hasNext()` function. Doesn't this mean that `hasNext()` **needs** to be called 
every time before we call 

[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14695166#comment-14695166
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36969808
  
--- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java 
---
@@ -1057,7 +1061,68 @@ public long count() throws Exception {
public UnionOperatorT union(DataSetT other){
return new UnionOperatorT(this, other, 
Utils.getCallLocationName());
}
+
+   // 

+   //  Sample
+   // 

+   
+   /**
+* Generate a sample of DataSet by the probability fraction of each 
element.
+*
+* @param withReplacement Whether element can be selected more than 
once.
+* @param fractionProbability that each element is chosen, 
should be [0,1] without replacement,
+*and [0, ∞) with replacement. While fraction 
is larger than 1, the elements are 
+*expected to be selected multi times into 
sample on average.
+* @return The sampled DataSet
+*/
+   public MapPartitionOperatorT, T sample(final boolean withReplacement, 
final double fraction) {
+   return sample(withReplacement, fraction, Utils.RNG.nextLong());
+   }
+   
+   /**
+* Generate a sample of DataSet by the probability fraction of each 
element.
+*
+* @param withReplacement Whether element can be selected more than 
once.
+* @param fractionProbability that each element is chosen, 
should be [0,1] without replacement,
+*and [0, ∞) with replacement. While fraction 
is larger than 1, the elements are 
+*expected to be selected multi times into 
sample on average.
+* @param seedrandom number generator seed.
+* @return The sampled DataSet
+*/
+   public MapPartitionOperatorT, T sample(final boolean withReplacement, 
final double fraction, final long seed) {
+   return mapPartition(new SampleWithFractionT(withReplacement, 
fraction, seed));
+   }
+   
+   /**
+* Generate a sample of DataSet which contains fixed size elements.
+*
+* @param withReplacement Whether element can be selected more than 
once.
+* @param numSample   The expected sample size.
+* @return The sampled DataSet
+*/
--- End diff --

Maybe we want to include a note that this kind of sampling currently takes 
2 passes over the data, and recommend using fraction unless exact precision is 
necessary.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-13 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14696369#comment-14696369
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r37046195
  
--- Diff: 
flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.scala.operators
+
+import java.util.{List = JavaList, Random}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.util.CollectionDataSets
+import 
org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.Assert._
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{Before, After, Test}
+
+import scala.collection.JavaConverters._
+
+@RunWith(classOf[Parameterized])
+class SampleITCase(mode: TestExecutionMode) extends 
MultipleProgramsTestBase(mode) {
+  private val RNG: Random = new Random
+
+  private var result: JavaList[String] = null;
+
+  @Before
+  def initiate {
+ExecutionEnvironment.getExecutionEnvironment.setParallelism(5)
+  }
+
+  @After
+  def after() = {
+TestBaseUtils.containsResultAsText(result, getSourceStrings)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithFractionWithoutReplacement {
+verifySamplerWithFractionWithoutReplacement(0d)
+verifySamplerWithFractionWithoutReplacement(0.2d)
+verifySamplerWithFractionWithoutReplacement(1.0d)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithFractionWithReplacement {
+verifySamplerWithFractionWithReplacement(0d)
+verifySamplerWithFractionWithReplacement(0.2d)
+verifySamplerWithFractionWithReplacement(1.0d)
+verifySamplerWithFractionWithReplacement(2.0d)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithSizeWithoutReplacement {
+verifySamplerWithFixedSizeWithoutReplacement(0)
+verifySamplerWithFixedSizeWithoutReplacement(2)
+verifySamplerWithFixedSizeWithoutReplacement(21)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSamplerWithSizeWithReplacement {
+verifySamplerWithFixedSizeWithReplacement(0)
+verifySamplerWithFixedSizeWithReplacement(2)
+verifySamplerWithFixedSizeWithReplacement(21)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithoutReplacement(fraction: 
Double) {
+verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithoutReplacement(fraction: 
Double, seed: Long) {
+verifySamplerWithFraction(false, fraction, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithReplacement(fraction: Double) {
+verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFractionWithReplacement(fraction: Double, 
seed: Long) {
+verifySamplerWithFraction(true, fraction, seed)
+  }
+
+  @throws(classOf[Exception])
+  private def verifySamplerWithFraction(withReplacement: Boolean, 
fraction: Double, seed: Long) {
+val ds = getSourceDataSet()
+val sampled = ds.sample(withReplacement, fraction, seed)
+result = sampled.collect.asJava
--- End diff --

The validity of sample result is verified in after() method for each test. 
As the source data is very small, verify the fraction does not make much sense, 
so i didn't verify the fraction validity 

[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14694548#comment-14694548
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36934499
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/operators/util/RandomSamplerTest.java
 ---
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test suite try to verify whether all the random samplers work as 
we expected, which mainly focus on:
+ * ul
+ * liDoes sampled result fit into input parameters? we check parameters 
like sample fraction, sample size,
+ * w/o replacement, and so on./li
+ * liDoes sampled result randomly selected? we verify this by measure 
how much does it distributed on source data.
+ * Run Kolmogorov-Smirnov (KS) test between the random samplers and 
default reference samplers which is distributed
+ * well-proportioned on source data. If random sampler select elements 
randomly from source, it would distributed
+ * well-proportioned on source data as well. The KS test will fail to 
strongly reject the null hypothesis that
+ * the distributions of sampling gaps are the same.
+ * /li
+ * /ul
+ *
+ * @see a 
href=https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test;Kolmogorov 
Smirnov test/a
+ */
+public class RandomSamplerTest {
+   private final static int SOURCE_SIZE = 1;
+   private static KolmogorovSmirnovTest ksTest;
+   private static ListDouble source;
+   private final static int DEFFAULT_PARTITION_NUMBER=10;
+   private ListDouble[] sourcePartitions = new 
List[DEFFAULT_PARTITION_NUMBER];
+
+   @BeforeClass
+   public static void init() {
+   // initiate source data set.
+   source = new ArrayListDouble(SOURCE_SIZE);
+   for (int i = 0; i  SOURCE_SIZE; i++) {
+   source.add((double) i);
+   }
+   
+   ksTest = new KolmogorovSmirnovTest();
+   }
+
+   private void initSourcePartition() {
+   for (int i=0; iDEFFAULT_PARTITION_NUMBER; i++) {
+   sourcePartitions[i] = new LinkedListDouble();
+   }
+   for (int i = 0; i SOURCE_SIZE; i++) {
+   int index = i % DEFFAULT_PARTITION_NUMBER;
+   sourcePartitions[index].add((double)i);
+   }
+   }
+   
+   @Test(expected = java.lang.IllegalArgumentException.class)
+   public void testBernoulliSamplerWithUnexpectedFraction1() {
+   verifySamplerFraction(-1, false);
+   }
+   
+   @Test(expected = java.lang.IllegalArgumentException.class)
+   public void testBernoulliSamplerWithUnexpectedFraction2() {
+   verifySamplerFraction(2, false);
+   }
+   
+   @Test
+   public void testBernoulliSamplerFraction() {
+   verifySamplerFraction(0.01, false);
+   verifySamplerFraction(0.05, false);
+   verifySamplerFraction(0.1, false);
+   verifySamplerFraction(0.3, false);
+   verifySamplerFraction(0.5, false);
+   verifySamplerFraction(0.854, false);
+   

[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-12 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14693288#comment-14693288
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36844879
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/operators/util/RandomSamplerTest.java
 ---
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This test suite try to verify whether all the random samplers work as 
we expected, which mainly focus on:
+ * ul
+ * liDoes sampled result fit into input parameters? we check parameters 
like sample fraction, sample size,
+ * w/o replacement, and so on./li
+ * liDoes sampled result randomly selected? we verify this by measure 
how much does it distributed on source data.
+ * Run Kolmogorov-Smirnov (KS) test between the random samplers and 
default reference samplers which is distributed
+ * well-proportioned on source data. If random sampler select elements 
randomly from source, it would distributed
+ * well-proportioned on source data as well. The KS test will fail to 
strongly reject the null hypothesis that
+ * the distributions of sampling gaps are the same.
+ * /li
+ * /ul
+ *
+ * @see a 
href=https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test;Kolmogorov 
Smirnov test/a
+ */
+public class RandomSamplerTest {
+   private final static int SOURCE_SIZE = 1;
+   private static KolmogorovSmirnovTest ksTest;
+   private static ListDouble source;
+   private final static int DEFFAULT_PARTITION_NUMBER=10;
+   private ListDouble[] sourcePartitions = new 
List[DEFFAULT_PARTITION_NUMBER];
+
+   @BeforeClass
+   public static void init() {
+   // initiate source data set.
+   source = new ArrayListDouble(SOURCE_SIZE);
+   for (int i = 0; i  SOURCE_SIZE; i++) {
+   source.add((double) i);
+   }
+   
+   ksTest = new KolmogorovSmirnovTest();
+   }
+
+   private void initSourcePartition() {
+   for (int i=0; iDEFFAULT_PARTITION_NUMBER; i++) {
+   sourcePartitions[i] = new LinkedListDouble();
+   }
+   for (int i = 0; i SOURCE_SIZE; i++) {
+   int index = i % DEFFAULT_PARTITION_NUMBER;
+   sourcePartitions[index].add((double)i);
+   }
+   }
+   
+   @Test(expected = java.lang.IllegalArgumentException.class)
+   public void testBernoulliSamplerWithUnexpectedFraction1() {
+   verifySamplerFraction(-1, false);
+   }
+   
+   @Test(expected = java.lang.IllegalArgumentException.class)
+   public void testBernoulliSamplerWithUnexpectedFraction2() {
+   verifySamplerFraction(2, false);
+   }
+   
+   @Test
+   public void testBernoulliSamplerFraction() {
+   verifySamplerFraction(0.01, false);
+   verifySamplerFraction(0.05, false);
+   verifySamplerFraction(0.1, false);
+   verifySamplerFraction(0.3, false);
+   verifySamplerFraction(0.5, false);
+   verifySamplerFraction(0.854, false);
+   

[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14692943#comment-14692943
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-130175707
  
Hi, @tillrohrmann , it's ready for review now.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative or exact size of the sample, set a seed for 
 reproducibility, and support sampling within iterations.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14680223#comment-14680223
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user thvasilo commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-129481478
  
Hello @ChengXiangLi, perhaps looking at 
[this](http://machinelearning.wustl.edu/mlpapers/papers/icml2013_meng13a) paper 
may help with deciding which sampling algorithm to use when deciding for the 
exact sample size algorithm.

It provides an implementation specifically designed for a MapReduce 
environment.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14681142#comment-14681142
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-129684685
  
Thanks, @thvasilo , that paper introduced an random sample algorithm which 
is an extend algorithm of the one i described before, it has two threshold the 
filter the element before sort, if element weight is bigger than up 
threshold, it would be included in final top K elements with very high 
possibility, if element weight is smaller than down threshold, it would not 
be included in final top K elements with very high possibility. With accepted 
possibility, we can filter the element with weigh larger than up threshold or 
smaller than down threshold, only sort the elements with weight between the 
thresholds.
 
This is a very good algorithm, i would add it on my notebook for further 
improvement, but i don't want to implement it right way. This PR is large 
enough to me, so i would like to leave all the algorithms optimization in 
future, and just keep the basic implementations of sample algorithms here, make 
sure they are simple, easy to understand, work correctly, and they can be used 
as the performance base line for the further improvement.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-10 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14679955#comment-14679955
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-129409569
  
Thanks for the input, @tillrohrmann and @sachingoel0101 . I would like to 
implement the fixed size sampling with only one pass through source dataset, 
since while user try to sample a dataset, the dataset should be quite large in 
most cases, pass through the dataset multi times would add much more effort. In 
my solution, the basic idea of fixed size sample in distributed stream is that: 
generate a random number for each input elements as its weight, select top K 
elements with max weight, as the weights are generated randomly, so the 
selected top K elements are selected randomly. You can see more detail 
information in the code and javadoc.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14662173#comment-14662173
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user sachingoel0101 commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-128781613
  
I have worked on this problem before. The idea is to divide the data into 
blocks and find the probability of selection of an element from a block.
Thus, suppose there are blocks B_1, B_2, ..., B_N with probabilities P_1, 
P_2, ..., P_N, then you sample k points by first sampling from the distribution 
{P_1, P_2, ..., P_N} and find the number of elements you require from each 
block. After that you select the required number of points from each block and 
take a union.
It is pretty easy to implement in a shared memory system but with a 
distributed system, it is hard. I tried the following approach some time 
before, although didn't quite finish working on it:
blockedData = Data - (block_id, data)

blockNumbers = (block_id, data) - (block_id, count)

(1...k) - (list of block ids we'll be sampling from)
After this, I tried broadcasting the list and selecting the required number 
of elements from each block, which can be done quite easily. But what if k is 
very large?


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661768#comment-14661768
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-128690389
  
The current state with the `RichMapPartitionFunctions` looks good to me 
:+1: 

You're right that user usually want to fix the size for the whole sample. 
An easy solution could be to assign each item an index, see 
`DataSetUtils.zipWithIndex`. Then we can compute the maximum index (which is 
effectively counting the data set elements). This gives us the range from which 
have to sample. By generating a parallel sequence of the size of our sample 
size with `env.generateSequence(maxIndex)`, we could then sample from `[0, 
maxIndex]`. At last we would have to join this data set with the original data 
set which has the indices assigned. There are probably more efficient 
algorithms out there than this one.

Just ping me when you've found a solution for the problem. Looking forward 
reviewing it :-)


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-06 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661258#comment-14661258
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-128580607
  
Hi, @tillrohrmann , current implementation of sample with fixed size would 
generate fixed size sample for each partition randomly instead of the whole 
dataset, user may expect the later one actually most of the time. I'm research 
on how to sample fixed size elements randomly from distributed data stream, i 
think we can pause this PR review until i merge the previous fix.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14654757#comment-14654757
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36267430
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithReplacement.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.math3.distribution.PoissonDistribution;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A simple in memory implementation of Reservoir Sampling with 
replacement, and with only one pass through
+ * the input iteration whose size is unpredictable.
+ * This implementation refers to the algorithm described in 
Reservoir-based Random Sampling with Replacement
+ * from Data Stream.
+ *
+ * @param T the type of sample.
+ */
+public class ReservoirSamplerWithReplacementT extends RandomSamplerT {
+   private final int numSamples;
+   private final Random random;
+   private PoissonDistribution poissonDistribution;
+   private ListInteger positions;
+   
+   /**
+* Create a reservoir sampler with fixed sample size and default random 
number generator.
+*
+* @param numSamples number of samples to retain in reservoir, must be 
non-negative.
+*/
+   public ReservoirSamplerWithReplacement(int numSamples) {
+   this(numSamples, new Random());
+   }
+   
+   /**
+* Create a reservoir sampler with fixed sample size and random number 
generator seed.
+*
+* @param numSamples number of samples to retain in reservoir, must be 
non-negative.
+* @param seed   random number generator seed
+*/
+   public ReservoirSamplerWithReplacement(int numSamples, long seed) {
+   this(numSamples, new Random(seed));
+   }
+   
+   /**
+* Create a reservoir sampler with fixed sample size and random number 
generator.
+*
+* @param numSamples number of samples to retain in reservoir, must be 
non-negative.
--- End diff --

Fixed.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-08-04 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14654760#comment-14654760
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r36267541
  
--- Diff: pom.xml ---
@@ -224,6 +224,12 @@ under the License.
version3.2.1/version
/dependency
 
+   dependency
+   groupIdorg.apache.commons/groupId
+   artifactIdcommons-math3/artifactId
+   version3.5/version
+   /dependency
--- End diff --

I didn't found flink-dist/NOTICE and flink-dist/LICENSE files, there are 
NOTICE and LICENSE in flink root directory, but i didn't found the right place 
to add something related to commons-math3, so i would just leave this to you, 
Till.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649351#comment-14649351
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35984921
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/RandomSampler.java
 ---
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import java.util.Iterator;
+
+public abstract class RandomSamplerT {
--- End diff --

JavaDocs are missing


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649332#comment-14649332
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35984232
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.Iterator;
+import java.util.Random;
+
+/**
+ * A sampler implementation built upon Bernoulli Trail. For sample without 
replacement, each element sample choice is just a bernoulli trail.
+ *
+ * @param T The type of sample.
+ */
+public class BernoulliSamplerT extends RandomSamplerT {
+   
+   private final double fraction;
+   private final Random random;
+   
+   /**
+* Create a bernoulli sampler sample fraction and default random number 
generator.
+*
+* @param fraction sample fraction, aka the bernoulli sampler 
possibility.
+*/
+   public BernoulliSampler(double fraction) {
+   this(fraction, new Random());
+   }
+   
+   /**
+* Create a bernoulli sampler sample fraction and random number 
generator seed.
+*
+* @param fraction sample fraction, aka the bernoulli sampler 
possibility.
+* @param seed random number generator seed.
+*/
+   public BernoulliSampler(double fraction, long seed) {
+   this(fraction, new Random(seed));
+   }
+   
+   /**
+* Create a bernoulli sampler sample fraction and random number 
generator.
+*
+* @param fraction sample fraction, aka the bernoulli sampler 
possibility.
+* @param random   the random number generator.
+*/
+   public BernoulliSampler(double fraction, Random random) {
+   Preconditions.checkArgument(fraction = 0  fraction = 1.0d, 
fraction fraction must between [0, 1].);
+   this.fraction = fraction;
+   this.random = random;
+   }
+   
+   /**
+* Sample the input elements, for each input element, take a Bernoulli 
Trail for sample.
+*
+* @param input elements to be sampled.
+* @return the sampled result which is lazy computed upon input 
elements.
+*/
+   @Override
+   public IteratorT sample(final IteratorT input) {
+   if (fraction == 0) {
+   return EMPTY_ITERABLE;
+   }
+   
+   return new SampledIteratorT() {
+   T current;
+   
+   @Override
+   public boolean hasNext() {
+   if (current == null) {
+   while (input.hasNext()) {
+   T element = input.next();
+   if (random.nextDouble() = 
fraction) {
+   current = element;
+   return true;
+   }
+   }
+   current = null;
+   return false;
+   }
+   return false;
--- End diff --

I think, if I'm not mistaken, that `hasNext` has to be idempotent. Thus it 
should return `true` if `current != null`.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: 

[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649419#comment-14649419
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35988629
  
--- Diff: pom.xml ---
@@ -224,6 +224,12 @@ under the License.
version3.2.1/version
/dependency
 
+   dependency
+   groupIdorg.apache.commons/groupId
+   artifactIdcommons-math3/artifactId
+   version3.5/version
+   /dependency
--- End diff --

For that we have to add an entry in the `flink-dist/NOTICE` and 
`flink-dist/LICENSE` files. But I can do that when merging the PR.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649341#comment-14649341
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35984449
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/PoissonSampler.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.math3.distribution.PoissonDistribution;
+
+import java.util.Iterator;
+
+/**
+ * A sampler implementation based on Poisson Distribution. While sample 
elements with replacement,
+ * the picked number of each element follow poisson distribution, so we 
could use poisson distribution
--- End diff --

Typo: element follows a given poisson distribution


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649337#comment-14649337
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35984385
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/PoissonSampler.java
 ---
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.math3.distribution.PoissonDistribution;
+
+import java.util.Iterator;
+
+/**
+ * A sampler implementation based on Poisson Distribution. While sample 
elements with replacement,
--- End diff --

While sampling elements


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649372#comment-14649372
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35985704
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithoutReplacement.java
 ---
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A simple in memory implementation of Reservoir Sampling without 
replacement, and with only one pass through
+ * the input iteration whose size is unpredictable.
+ * This implementation refers to the Algorithm R described in Random 
Sampling with a Reservoir Vitter, 1985.
--- End diff --

Maybe add again a link to the paper if available.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649358#comment-14649358
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35985033
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithReplacement.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.math3.distribution.PoissonDistribution;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A simple in memory implementation of Reservoir Sampling with 
replacement, and with only one pass through
+ * the input iteration whose size is unpredictable.
+ * This implementation refers to the algorithm described in 
Reservoir-based Random Sampling with Replacement
+ * from Data Stream.
--- End diff --

Maybe put a link to the paper in here.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649360#comment-14649360
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/949#discussion_r35985116
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithReplacement.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.operators.util;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.math3.distribution.PoissonDistribution;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * A simple in memory implementation of Reservoir Sampling with 
replacement, and with only one pass through
+ * the input iteration whose size is unpredictable.
+ * This implementation refers to the algorithm described in 
Reservoir-based Random Sampling with Replacement
+ * from Data Stream.
+ *
+ * @param T the type of sample.
+ */
+public class ReservoirSamplerWithReplacementT extends RandomSamplerT {
+   private final int numSamples;
+   private final Random random;
+   private PoissonDistribution poissonDistribution;
+   private ListInteger positions;
+   
+   /**
+* Create a reservoir sampler with fixed sample size and default random 
number generator.
+*
+* @param numSamples number of samples to retain in reservoir, must be 
non-negative.
+*/
+   public ReservoirSamplerWithReplacement(int numSamples) {
+   this(numSamples, new Random());
+   }
+   
+   /**
+* Create a reservoir sampler with fixed sample size and random number 
generator seed.
+*
+* @param numSamples number of samples to retain in reservoir, must be 
non-negative.
+* @param seed   random number generator seed
+*/
+   public ReservoirSamplerWithReplacement(int numSamples, long seed) {
+   this(numSamples, new Random(seed));
+   }
+   
+   /**
+* Create a reservoir sampler with fixed sample size and random number 
generator.
+*
+* @param numSamples number of samples to retain in reservoir, must be 
non-negative.
--- End diff --

@param is missing


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-31 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649429#comment-14649429
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-126740651
  
Thanks for your contribution @ChengXiangLi. The code is really well tested 
and well structured. Great work :-)

I had only some minor comments. There is however one thing I'm not so sure 
about. With the current implementation, all parallel tasks of the sampling 
operator will get the same random generator/seed value. Thus, every node will 
generate the same sequence of random numbers. I think this can have a negative 
influence on the sampling. What we could do is to use 
`RichMapPartitionFunction` instead of the `MapPartitionFunction`. With the rich 
function, we either have access to the subtask index, given by 
`getRuntimeContext().getIndexOfThisSubtask()`,  which we could use to modify 
the initial seed or we generate the random number generator in the `open` 
method (this method is executed on the TaskManager). Assuming that the clocks 
are not completely synchronized and that the individual tasks will be 
instantiated not at the same time, this could give us less correlated random 
number sequences. What do you think? 


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-30 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14647517#comment-14647517
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user tillrohrmann commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-126296058
  
Thanks for your contribution @ChengXiangLi. I'll try to review your PR 
tomorrow morning.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14645658#comment-14645658
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

GitHub user ChengXiangLi opened a pull request:

https://github.com/apache/flink/pull/949

[FLINK-1901] [core] Create sample operator for Dataset.

This PR includes:
1. 4 random sampler implementation for different sample strategies.
2. sample operator for DataSet Java API.
3. random sampler unit test.
4. sample operator Java API integration test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ChengXiangLi/flink FLINK-1901

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/949.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #949


commit f7ba8779b8d6a6d66ab5d4e2435a70e220b1e0fc
Author: chengxiang li chengxiang...@intel.com
Date:   2015-07-22T03:38:13Z

[FLINK-1901] [core] Create sample operator for Dataset.




 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14645803#comment-14645803
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-125898830
  
Previously, i plan to leave the sample scala API to an separate PR as i not 
very familiar with scala, but the failed test shows that Flink has a test to 
make sure scala and java has the same API, i would try to add scala API and 
integration test later.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-29 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14647192#comment-14647192
 ] 

ASF GitHub Bot commented on FLINK-1901:
---

Github user ChengXiangLi commented on the pull request:

https://github.com/apache/flink/pull/949#issuecomment-126184403
  
scala API and it's integration test has been merged into latest commit. 
@tillrohrmann do you have time to review this PR?


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-23 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638272#comment-14638272
 ] 

Chengxiang Li commented on FLINK-1901:
--

Thanks,[~till.rohrmann], i got it now. This more like a iteration optimization 
issue to me, it assumes that the output of static code path would always be the 
same, so it cached the output for potential performance improvement, but this 
assumption is not always true, for example, static code path with random 
sampling operator, data source read from HBase, and so on. I think we could 
open a separate JIRA to address it in a uniform way instead of taking random 
sampling as a special case in this JIRA.

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-23 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638429#comment-14638429
 ] 

Till Rohrmann commented on FLINK-1901:
--

That's a good idea to break down the task. Do you want to take the lead 
[~chengxiang li]?

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-23 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638448#comment-14638448
 ] 

Till Rohrmann commented on FLINK-1901:
--

Currently, whats happening to decide whether an operator is on a dynamic path 
or not is to look at the inputs of the operator. If they are dynamic so is the 
current operator. The iteration {{DataSets}}, {{WorksetPlaceHolder}}, 
{{SolutionSetPlaceHolder}} and {{PartialSolutionPlaceHolder}}, are always 
dynamic. What could be an idea is to allow other operators also to be declared 
dynamic. That way they can also start dynamic path. Afterwards, we have to make 
sure that not only the iteration {{DataSets}} get a {{IterationHead}} 
prepended, which kicks off the iterations, but also all the other operators 
which start a dynamic path. 

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-23 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638617#comment-14638617
 ] 

Chengxiang Li commented on FLINK-1901:
--

Thanks for the analysis, [~trohrm...@apache.org], I've created FLINK-2396 as a 
followup work to support sample and other similar datasets in iteration.

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-22 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636451#comment-14636451
 ] 

Till Rohrmann commented on FLINK-1901:
--

If you use the sampling operator this way, it works. However, usually your 
iteration data set is something like the weight vector of your model and you 
have another training dataset from which you want to take a small sample to 
update your weight vector in each iteration (e.g. SGD). When you write a 
program like that, then you'll see that the output of the sampling operator 
will always be the same (for every iteration). The reason is that the sampling 
no longer is on the dynamic path of the iteration and thus it is only once 
calculated and then cached. This is not the intended behaviour, though.

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14635206#comment-14635206
 ] 

Till Rohrmann commented on FLINK-1901:
--

I think this solution is indeed a little bit too hacky. It would  be very 
unintuitive for the user having to broadcast the iteration {{DataSet}} to the 
sampling operator. Furthermore, this will inflict unnecessary network I/O.

I think we should try to solve this problem properly. This means that we have a 
single sampling operator which works inside and outside of iterations. This 
will also avoid code duplication.

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636124#comment-14636124
 ] 

Chengxiang Li commented on FLINK-1901:
--

every point is sampled with probability 1/N is one of the sampling 
case(sampling with fraction, without replacement), there are 3 others kind of 
sampling case which is normally used as well, like sampling with fraction, 
with replacement, sampling with fixed size, without replacement and 
sampling with fixed size, with replacement. We should support all of them 
while expose a sampling operator to user. 

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636332#comment-14636332
 ] 

Chengxiang Li commented on FLINK-1901:
--

I write a simple example of sampling operator at 
[here|https://github.com/ChengXiangLi/flink/blob/FLINK-1901/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/TestSample.java],
 it works just as expected inside or outside of iteration(for example, 1000 
items, sample fraction 0.5, after 3 iterations, output contains around 125 
items), [~trohrm...@apache.org], i'm not sure whether i understand you 
correctly about sampling inside iteration, it looks same to me if it's the case 
in the example. 

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634622#comment-14634622
 ] 

Chengxiang Li commented on FLINK-1901:
--

To randomly choose a sample from a DataSet S, basically, there exists two kinds 
of sample requirement: sampling with factor(such as randomly choose 5% percent 
items in S) and sampling with fixed size(such as randomly choose 100 items 
from S). Besides, we do not know the size of S, unless we take extra cost to 
computer it through DataSet::count().
# Sampling with factor
#* With replacement: the expected sample size follow [Poisson 
Distribution|https://en.wikipedia.org/wiki/Poisson_distribution] in this case, 
so Poisson Sampling can be used to choose the sample items.
#* Without replacement: during sampling, we can take the sample of each item in 
iterator as a [Bernoulli Trial|https://en.wikipedia.org/wiki/Bernoulli_trial].
# Sampling with fixed size
#* Use DataSet::count() to get the dataset size, with the fixed size, we can 
turn this into sampling with factor.
#* [Reservoir Sampling|https://en.wikipedia.org/wiki/Reservoir_sampling] is 
another commonly used algorithm to randomly choose a sample of k items from a 
list S containing n items, where n is either a very large or unknown number, 
and there are different reservoir sampling algorithms that support reservoir 
support both sampling with replacement and sampling without replacement.


 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634664#comment-14634664
 ] 

Till Rohrmann commented on FLINK-1901:
--

Hi Chengxiang,

good to hear that you want to work in this. I can assign you the ticket. 
However, it is not only about the sampling strategy but also about the 
integration within Flink. The reason is that we have to make sure that the 
sampling operator also works within iterations. This means that it has to be 
part of the dynamic path so that it is triggered for every iteration again and 
again. This will need a special operator type.

But you can start with the sampling strategies and then continue with the 
iteration integration.

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634800#comment-14634800
 ] 

Till Rohrmann commented on FLINK-1901:
--

Oh I forgot. Sorry.

What about the iteration support?

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Sachin Goel (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634797#comment-14634797
 ] 

Sachin Goel commented on FLINK-1901:


[~trohrm...@apache.org], this is already a part of the k-means initialization 
schemes [Most of it anyway].

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634826#comment-14634826
 ] 

Till Rohrmann commented on FLINK-1901:
--

To be honest, I doubt that the sampling is executed repeatedly if it's not the 
iteration data set from which you're sampling. If you use map and reduce 
operations which lie on the static path, then the results will be executed once 
and cached. But best you check the samples.

If it is possible to create a separate PR out of it, then it would be great. 
Makes reviewing much easier.

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Sachin Goel (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634802#comment-14634802
 ] 

Sachin Goel commented on FLINK-1901:


I didn't think there would be any such issue since the sampling itself uses Map 
and Reduce operations. I can check if there are any issues. It certainly gave 
really good results, which wouldn't happen if it weren't being executed 
repeatedly.

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634822#comment-14634822
 ] 

Chengxiang Li commented on FLINK-1901:
--

Hi, [~sachingoel0101], I didn't find any related class about sampling while 
search the project with the keyword, is the PR you mentioned ongoing now? 
Besides ML algorithms, there should be other use case depends on sampling 
operation, such as range partition, and i believe sample operation itself is a 
common operation which may be used directly by user.

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Chengxiang Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634628#comment-14634628
 ] 

Chengxiang Li commented on FLINK-1901:
--

Hi, [~tvas], i would like to contribute on this issue if there is no others 
working on it now.

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Sachin Goel (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14635102#comment-14635102
 ] 

Sachin Goel commented on FLINK-1901:


Yes. I do agree. I'll see if what I've written can work with iterations 
out-of-box. Fingers crossed.
And I meant, in random sampling, every point is sampled with probability 1/N. I 
was wondering about cases when this probability is not constant across all 
elements, instead is a function of the element itself. I guess that's too 
specific to be a feature in itself.

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Sachin Goel (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14635082#comment-14635082
 ] 

Sachin Goel commented on FLINK-1901:


Okay. So I checked the whole code and well, the random sampling I'm using there 
is never used inside an iteration. So as far as that goes, there are no 
problems. However, it would certainly be good to have a separate random 
sampling module, which can work on any data set, for that matter.
[~trohrm...@apache.org], do you think there is any utility for a sampling 
procedure different from random? That is, suppose there is a function which 
maps every element in the dataset to its probability of selection.
[~chengxiang li], yes. There is an ongoing PR 
(https://github.com/apache/flink/pull/757). And yes. It would certainly make 
sense to have a generic sample function.

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-07-21 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14635096#comment-14635096
 ] 

Till Rohrmann commented on FLINK-1901:
--

The problem is that a sampling operator should also work within iterations. 
There is definitely a big need for this, e.g. for stochastic gradient descent.

I don't really understand what you mean with your question [~sachingoel0101].

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis
Assignee: Chengxiang Li

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (FLINK-1901) Create sample operator for Dataset

2015-04-17 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14499396#comment-14499396
 ] 

Till Rohrmann commented on FLINK-1901:
--

A sampling operator for non-iterations can be realized as a mapPartition 
operator. This would also allow us to do sampling without replacement. 

The harder problem is to do sampling within an iteration. Usually, one would 
sample from a static code path which is only executed once. Maybe we can 
include the sampling operator on the dynamic code path to trigger it every 
iteration.

 Create sample operator for Dataset
 --

 Key: FLINK-1901
 URL: https://issues.apache.org/jira/browse/FLINK-1901
 Project: Flink
  Issue Type: Improvement
  Components: Core
Reporter: Theodore Vasiloudis

 In order to be able to implement Stochastic Gradient Descent and a number of 
 other machine learning algorithms we need to have a way to take a random 
 sample from a Dataset.
 We need to be able to sample with or without replacement from the Dataset, 
 choose the relative size of the sample, and set a seed for reproducibility.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)