[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708886#comment-14708886 ] ASF GitHub Bot commented on FLINK-1901: --- Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-134070592 Hey @ChengXiangLi, I have another concern, regarding the seed for sampling. It doesn't seem to serve its purpose. I tried sampling with fraction three times with the same seed, however, every time I get different results. I've been stuck on this problem myself for quite some time. The fix is, instead of `seed + index`, you should just use `seed`. But then, the sample is not truly random. Far as I could figure out, splits of data don't arrive at the exactly same index subtask every time. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708918#comment-14708918 ] ASF GitHub Bot commented on FLINK-1901: --- Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-134081197 Yes. I was only wondering if we should at least ensure this when it is done right at the source though. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708921#comment-14708921 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-134082960 IMHO, this makes understanding the semantics of the sampling operator only more complicated because it behaves differently depending on the job graph structure. I would rather document this limitation more prominently. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708867#comment-14708867 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-134063115 Thanks, @chiwanpark , waiting for your result. Besides, we can reduce the fail-positive case to an acceptable possibility by expanding the verification boundary, while at the cost of making verification weak. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=1470#comment-1470 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-134070893 @thvasilo, the PR did not yet include the proper sampling behaviour within iterations. See FLINK-2396. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708889#comment-14708889 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-134071161 @sachingoel0101, you're right. The problem is that Flink does not give you a guarantee in which order the elements will arrive. But this problem won't be fixed by setting the seed for all sampling operators to the same value. There always might be an operator, e.g. rebalance, which will completely randomize your element order. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708658#comment-14708658 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-133990256 Hi, @sachingoel0101 , while sample with fraction, it's not easy to verify whether the DataSet is sampled with input fraction. In the test, i take 5 times sample, use the average size to computer the result fraction, and then compare the result fraction with input fraction, verify their difference is not more than 10% percent. The following case may happens as well, Sampler sample the DataSet with input fraction, but the sampled result size is too small or too large that beyond our verification condition, it happens, just with very little possibility, say less than 0.001 in this test. it should be ok if this failure happens very occasionally, please let me know if you found it's not. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14708701#comment-14708701 ] ASF GitHub Bot commented on FLINK-1901: --- Github user chiwanpark commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-133999705 @ChengXiangLi I know that it is hard to verify random sampler implementation. But we need to fix this test failing because of difficulty of other pull requests verification. Some tests of other pull requests are failed by K-S test and sampling test with fraction. There is a [JIRA issue](https://issues.apache.org/jira/browse/FLINK-2564) covered this. I'm testing with increased count of samples and source size. If I get a notable result, I'll post the result. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14707972#comment-14707972 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-133675023 It's great to have this in, I'll try to update the cross-validation and SGD to use this. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706538#comment-14706538 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-133371873 Looks great. Thanks a lot for your contribution @ChengXiangLi. Will merge it now. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706741#comment-14706741 ] ASF GitHub Bot commented on FLINK-1901: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/949 Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14706861#comment-14706861 ] ASF GitHub Bot commented on FLINK-1901: --- Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-133461987 Hey @ChengXiangLi , I just observed a failure on a test case: https://travis-ci.org/sachingoel0101/flink/jobs/76649177 Here is the relevant statement: RandomSamplerTest.testPoissonSamplerFraction:116-verifySamplerFraction:249 expected fraction: 0.01, result fraction: 0.009000 Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14704535#comment-14704535 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-132943584 Oh cool, then you were faster than me :-) Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14704676#comment-14704676 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-132972593 Move sample/sampleWithSize operator to DataSetUtil and update unit test. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14704508#comment-14704508 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-132935282 @ChengXiangLi, you're right, I should have noticed earlier and raise a flag. But your work is not in vain. I think it's some excellent piece of work and the `sample` method could also become part of the core API right away. For the sake of completeness, let's do it once the `sampleWithSize` method works also robustly. I think your proposition for the next steps is a good way to continue with it. Once you've moved the `sample` and `sampleWithSize` methods to the `DataSetUtils` class, we close and merge this PR. In the meantime, I'll create a JIRA for the topK operator, where we can discuss the matter further. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14704530#comment-14704530 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-132943122 OK, @tillrohrmann , great to hear that. Besides, I've created [FLINK-2549](https://issues.apache.org/jira/browse/FLINK-2549) for topK operator. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14703211#comment-14703211 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r37429899 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/PoissonSampler.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.math3.distribution.PoissonDistribution; + +import java.util.Iterator; + +/** + * A sampler implementation based on Poisson Distribution. While sampling elements with fraction and replacement, + * the selected number of each element follows a given poisson distribution, so we could use poisson + * distribution to generate random variables for sample. + * + * @param T The type of sample. + * @see a href=https://en.wikipedia.org/wiki/Poisson_distribution;https://en.wikipedia.org/wiki/Poisson_distribution/a + */ +public class PoissonSamplerT extends RandomSamplerT { + + private PoissonDistribution poissonDistribution; + private final double fraction; + + /** +* Create a poisson sampler which would sample elements with replacement. +* +* @param fraction The expected count of each element. +* @param seed Random number generator seed for internal PoissonDistribution. +*/ + public PoissonSampler(double fraction, long seed) { + Preconditions.checkArgument(fraction = 0, fraction should be positive.); + this.fraction = fraction; + if (this.fraction 0) { + this.poissonDistribution = new PoissonDistribution(fraction); + this.poissonDistribution.reseedRandomGenerator(seed); + } + } + + /** +* Create a poisson sampler which would sample elements with replacement. +* +* @param fraction The expected count of each element. +*/ + public PoissonSampler(double fraction) { + Preconditions.checkArgument(fraction = 0, fraction should be non-negative.); + this.fraction = fraction; + if (this.fraction 0) { + this.poissonDistribution = new PoissonDistribution(fraction); + } + } + + /** +* Sample the input elements, for each input element, generate its count with poisson distribution random variables generation. +* +* @param input Elements to be sampled. +* @return The sampled result which is lazy computed upon input elements. +*/ + @Override + public IteratorT sample(final IteratorT input) { + if (fraction == 0) { + return EMPTY_ITERABLE; + } + + return new SampledIteratorT() { + T currentElement; + int currentCount = 0; + + @Override + public boolean hasNext() { + if (currentElement == null || currentCount == 0) { + while (input.hasNext()) { + currentElement = input.next(); + currentCount = poissonDistribution.sample(); + if (currentCount 0) { + return true; + } + } + return false; + } + return true; + } +
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14703347#comment-14703347 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-132692640 Sorry for the late review @ChengXiangLi. I finished it now and the PR looks really good. There is only one thing I would like to change before merging it. This is to move the `sample` and the `sampleWithSize` methods from the `DataSet` to the `DataSetUtils` class. This will effectively make the `sample` methods not part of the core API. The reason for this is that the `sampleWithSize` method breaks with one of Flink's guarantees, which is the robustness of the core API functions against `OutOfMemoryExceptions`. Let me elaborate on it for better understanding. All of Flink internal operations work on serialized data which is stored in Flink's *managed* memory. The managed memory allows Flink to detect when to spill elements from memory to disk to avoid out of memory exceptions and, thus, to make the system robust. The managed memory is a pre-allocated area of memory which is administered by the `MemoryManager`. The `MemoryManager` allows you to allocate and deallocate `MemorySegments` in a c-style fashion. However, once a data item enters a UDF, the item has to be deserialized putting it on the remaining heap. This is not bad if your UDF does not accumulate these elements. However, the `sampleWithSize` method materializes up to `numSamples` elements on the heap. Depending on the number of samples and the data item size, this might be enough to eat up all remaining heap memory space and to crash the JVM. I think that your current implementation will work for most use cases but in order to make it part of the core API, we also have to deal with the case where our sample cannot be materialized on the remaining heap of a running Flink program. In order to achieve this, I think it would be necessary to implement a native `topK` operator. With *native* I mean an operator which works on Flink's managed memory and, thus, can also deal with spilling records to disk. Having such a `topK` operator, we could reimplement the reservoir sampling algorithm the following way: For sampling without replacement we first assign weights in a map operation to each element. Then we call topK with respect to the weights and obtain the sample. For the sampling with replacement we could simply use a flat map operation to assign `numSamples` times a weight to each element. Then we again call `topK` with respect to the weight. For the topK implementation, we would need something like a `PriorityQueue` which operates on managed memory (similar to the `CompactingHashTable` which is a hash table working on managed memory). Thus, we would have a priority queue which stores the priority values of each record and a pointer to the record which is kept in managed memory. Whenever an element is removed from the priority queue, we can also free the occupied managed memory. In case that we run out of managed memory, we have to spill some of the records to disk which are still in the race for the top k. As a first step, we can skip the spilling and just throw a proper exception (other than `OutOfMemoryException`) when we run out of memory. Afterwards, we can incrementally add the spilling functionality. I know that you've already spent a lot of effort into writing the sampling operator and that this result might be a little bit demotivating. However, if we want to make it right and robust, then I think this is the way to go. Additionally we would add a proper topK operator to Flink's API which is missing big time :-) If you want to, then you could also take the lead here. The further discussion should then happen in a separate issue. I'm more than willing to assist you in implementing this operator. What do you think? Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14704214#comment-14704214 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-132864761 Thanks for the detail explanation, @tillrohrmann . As the owner of this issue, make it work correctly and efficiently is my top desire, so i would never say no to a reasonable and proper suggestion(although i did expect it happens earlier :-)). `OutOfMemoryError` is one of my concern during the implementation as well, unlimited usage of Java heap is unacceptable, so i limit the reservoir size(`PriorityQueue` size in implementation) same as `numSamples` parameter, which give the user full control of the memory used in java heap for `sample` operator. It may lead to `OutOfMemoryError` if the `numSamples` is too large, but other operators may lead to `OutOfMemoryError` as well if user cache too much objects inside UDF. Anyway, it's just my first thought, a fully managed memory usage is a better solution, and i would like to implement that. As mentioned in the previous comment, is this the next step you prefer? 1. move the `sample` and the `sampleWithSize` methods from the `DataSet` to the `DataSetUtils` class, and merge this PR. 2. implement `topK` operator in a separate PR. 3. update `sampleWithSize` implementation, and move back to `DataSet`. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14701026#comment-14701026 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-132152536 Hi, @tillrohrmann , do you have sometime to continue to review this PR and help to push its progress? Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14702304#comment-14702304 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r37373497 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/PoissonSampler.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.math3.distribution.PoissonDistribution; + +import java.util.Iterator; + +/** + * A sampler implementation based on Poisson Distribution. While sampling elements with fraction and replacement, + * the selected number of each element follows a given poisson distribution, so we could use poisson + * distribution to generate random variables for sample. + * + * @param T The type of sample. + * @see a href=https://en.wikipedia.org/wiki/Poisson_distribution;https://en.wikipedia.org/wiki/Poisson_distribution/a + */ +public class PoissonSamplerT extends RandomSamplerT { + + private PoissonDistribution poissonDistribution; + private final double fraction; + + /** +* Create a poisson sampler which would sample elements with replacement. +* +* @param fraction The expected count of each element. +* @param seed Random number generator seed for internal PoissonDistribution. +*/ + public PoissonSampler(double fraction, long seed) { + Preconditions.checkArgument(fraction = 0, fraction should be positive.); + this.fraction = fraction; + if (this.fraction 0) { + this.poissonDistribution = new PoissonDistribution(fraction); + this.poissonDistribution.reseedRandomGenerator(seed); + } + } + + /** +* Create a poisson sampler which would sample elements with replacement. +* +* @param fraction The expected count of each element. +*/ + public PoissonSampler(double fraction) { + Preconditions.checkArgument(fraction = 0, fraction should be non-negative.); + this.fraction = fraction; + if (this.fraction 0) { + this.poissonDistribution = new PoissonDistribution(fraction); + } + } + + /** +* Sample the input elements, for each input element, generate its count with poisson distribution random variables generation. +* +* @param input Elements to be sampled. +* @return The sampled result which is lazy computed upon input elements. +*/ + @Override + public IteratorT sample(final IteratorT input) { + if (fraction == 0) { + return EMPTY_ITERABLE; + } + + return new SampledIteratorT() { + T currentElement; + int currentCount = 0; + + @Override + public boolean hasNext() { + if (currentElement == null || currentCount == 0) { + while (input.hasNext()) { + currentElement = input.next(); + currentCount = poissonDistribution.sample(); + if (currentCount 0) { + return true; + } + } + return false; + } + return true; + } +
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14701440#comment-14701440 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r37313995 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/PoissonSampler.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.math3.distribution.PoissonDistribution; + +import java.util.Iterator; + +/** + * A sampler implementation based on Poisson Distribution. While sampling elements with fraction and replacement, + * the selected number of each element follows a given poisson distribution, so we could use poisson + * distribution to generate random variables for sample. + * + * @param T The type of sample. + * @see a href=https://en.wikipedia.org/wiki/Poisson_distribution;https://en.wikipedia.org/wiki/Poisson_distribution/a + */ +public class PoissonSamplerT extends RandomSamplerT { + + private PoissonDistribution poissonDistribution; + private final double fraction; + + /** +* Create a poisson sampler which would sample elements with replacement. +* +* @param fraction The expected count of each element. +* @param seed Random number generator seed for internal PoissonDistribution. +*/ + public PoissonSampler(double fraction, long seed) { + Preconditions.checkArgument(fraction = 0, fraction should be positive.); + this.fraction = fraction; + if (this.fraction 0) { + this.poissonDistribution = new PoissonDistribution(fraction); + this.poissonDistribution.reseedRandomGenerator(seed); + } + } + + /** +* Create a poisson sampler which would sample elements with replacement. +* +* @param fraction The expected count of each element. +*/ + public PoissonSampler(double fraction) { + Preconditions.checkArgument(fraction = 0, fraction should be non-negative.); + this.fraction = fraction; + if (this.fraction 0) { + this.poissonDistribution = new PoissonDistribution(fraction); + } + } + + /** +* Sample the input elements, for each input element, generate its count with poisson distribution random variables generation. +* +* @param input Elements to be sampled. +* @return The sampled result which is lazy computed upon input elements. +*/ + @Override + public IteratorT sample(final IteratorT input) { + if (fraction == 0) { + return EMPTY_ITERABLE; + } + + return new SampledIteratorT() { + T currentElement; + int currentCount = 0; + + @Override + public boolean hasNext() { + if (currentElement == null || currentCount == 0) { + while (input.hasNext()) { + currentElement = input.next(); + currentCount = poissonDistribution.sample(); + if (currentCount 0) { + return true; + } + } + return false; + } + return true; + } +
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14696619#comment-14696619 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-131001106 Thanks for the review, @thvasilo , fixed all comments in latest commit. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14696632#comment-14696632 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r37054584 --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.operators + +import java.util.{List = JavaList, Random} + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.junit.Assert._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{Before, After, Test} + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SampleITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + private val RNG: Random = new Random + + private var result: JavaList[String] = null; + + @Before + def initiate { +ExecutionEnvironment.getExecutionEnvironment.setParallelism(5) + } + + @After + def after() = { +TestBaseUtils.containsResultAsText(result, getSourceStrings) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithFractionWithoutReplacement { +verifySamplerWithFractionWithoutReplacement(0d) +verifySamplerWithFractionWithoutReplacement(0.2d) +verifySamplerWithFractionWithoutReplacement(1.0d) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithFractionWithReplacement { +verifySamplerWithFractionWithReplacement(0d) +verifySamplerWithFractionWithReplacement(0.2d) +verifySamplerWithFractionWithReplacement(1.0d) +verifySamplerWithFractionWithReplacement(2.0d) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithSizeWithoutReplacement { +verifySamplerWithFixedSizeWithoutReplacement(0) +verifySamplerWithFixedSizeWithoutReplacement(2) +verifySamplerWithFixedSizeWithoutReplacement(21) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithSizeWithReplacement { +verifySamplerWithFixedSizeWithReplacement(0) +verifySamplerWithFixedSizeWithReplacement(2) +verifySamplerWithFixedSizeWithReplacement(21) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithoutReplacement(fraction: Double) { +verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithoutReplacement(fraction: Double, seed: Long) { +verifySamplerWithFraction(false, fraction, seed) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithReplacement(fraction: Double) { +verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithReplacement(fraction: Double, seed: Long) { +verifySamplerWithFraction(true, fraction, seed) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFraction(withReplacement: Boolean, fraction: Double, seed: Long) { +val ds = getSourceDataSet() +val sampled = ds.sample(withReplacement, fraction, seed) +result = sampled.collect.asJava --- End diff -- Thanks, I missed that. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14694886#comment-14694886 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36950634 --- Diff: flink-scala/src/main/scala/org/apache/flink/api/scala/DataSet.scala --- @@ -1182,6 +1184,60 @@ class DataSet[T: ClassTag](set: JavaDataSet[T]) { getCallLocationName())) // + // Sample + // + /** + * Generate a sample of DataSet by the probability fraction of each element. + * + * @param withReplacement Whether element can be selected more than once. + * @param fractionProbability that each element is chosen, should be [0,1] without + *replacement, and [0, ∞) with replacement. While fraction is larger + *than 1, the elements are expected to be selected multi times into + *sample on average. + * @param seedRandom number generator seed. + * @return The sampled DataSet + */ + def sample( + withReplacement: Boolean, + fraction: Double, + seed: Long = Utils.RNG.nextLong()): DataSet[T] = { + +wrap(new MapPartitionOperator[T, T](javaSet, + getType(), + new SampleWithFraction(withReplacement, fraction, seed), + getCallLocationName())) + } + + /** + * Generate a sample of DataSet by the probability fraction of each element. --- End diff -- Javadoc is from the fraction function. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14694981#comment-14694981 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36957769 --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.operators + +import java.util.{List = JavaList, Random} + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.junit.Assert._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{Before, After, Test} + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SampleITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + private val RNG: Random = new Random + + private var result: JavaList[String] = null; + + @Before + def initiate { +ExecutionEnvironment.getExecutionEnvironment.setParallelism(5) + } + + @After + def after() = { +TestBaseUtils.containsResultAsText(result, getSourceStrings) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithFractionWithoutReplacement { +verifySamplerWithFractionWithoutReplacement(0d) +verifySamplerWithFractionWithoutReplacement(0.2d) +verifySamplerWithFractionWithoutReplacement(1.0d) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithFractionWithReplacement { +verifySamplerWithFractionWithReplacement(0d) +verifySamplerWithFractionWithReplacement(0.2d) +verifySamplerWithFractionWithReplacement(1.0d) +verifySamplerWithFractionWithReplacement(2.0d) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithSizeWithoutReplacement { +verifySamplerWithFixedSizeWithoutReplacement(0) +verifySamplerWithFixedSizeWithoutReplacement(2) +verifySamplerWithFixedSizeWithoutReplacement(21) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithSizeWithReplacement { +verifySamplerWithFixedSizeWithReplacement(0) +verifySamplerWithFixedSizeWithReplacement(2) +verifySamplerWithFixedSizeWithReplacement(21) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithoutReplacement(fraction: Double) { +verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithoutReplacement(fraction: Double, seed: Long) { +verifySamplerWithFraction(false, fraction, seed) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithReplacement(fraction: Double) { +verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithReplacement(fraction: Double, seed: Long) { +verifySamplerWithFraction(true, fraction, seed) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFraction(withReplacement: Boolean, fraction: Double, seed: Long) { +val ds = getSourceDataSet() +val sampled = ds.sample(withReplacement, fraction, seed) +result = sampled.collect.asJava --- End diff -- Is this result checked for validity somewhere? Create sample operator for Dataset -- Key: FLINK-1901 URL:
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14695031#comment-14695031 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36960080 --- Diff: flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java --- @@ -451,6 +454,53 @@ protected static File asFile(String path) { assertEquals(extectedStrings[i], resultStrings[i]); } } + + // + // Comparison methods for tests using sample + // + + public static T void containsResultAsTuples(ListT result, String expected) { --- End diff -- Is this used anywhere? Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14694940#comment-14694940 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36955412 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; + +import java.util.Iterator; +import java.util.Random; + +/** + * A sampler implementation built upon Bernoulli Trail. For sample with fraction and without replacement, + * each element sample choice is just a bernoulli trail. --- End diff -- Do you mean Bernouli _trial_ here? Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14694943#comment-14694943 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36955515 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; + +import java.util.Iterator; +import java.util.Random; + +/** + * A sampler implementation built upon Bernoulli Trail. For sample with fraction and without replacement, + * each element sample choice is just a bernoulli trail. + * + * @param T The type of sample. + */ +public class BernoulliSamplerT extends RandomSamplerT { + + private final double fraction; + private final Random random; + + /** +* Create a bernoulli sampler sample fraction and default random number generator. --- End diff -- *B*ernouli should be capitalized for all mentions Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14695034#comment-14695034 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36960527 --- Diff: flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java --- @@ -451,6 +454,53 @@ protected static File asFile(String path) { assertEquals(extectedStrings[i], resultStrings[i]); } } + + // + // Comparison methods for tests using sample + // + + public static T void containsResultAsTuples(ListT result, String expected) { + isExpectedContainsResult(result, expected, true); + } + + public static T void containsResultAsText(ListT result, String expected) { + isExpectedContainsResult(result, expected, false); + } + + private static T void isExpectedContainsResult(ListT result, String expected, boolean asTuple) { --- End diff -- Can we get comments explaining the functionality of this and `containsResultAsText`? Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14695003#comment-14695003 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36958345 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java --- @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; + +import java.util.Iterator; +import java.util.Random; + +/** + * A sampler implementation built upon Bernoulli Trail. For sample with fraction and without replacement, + * each element sample choice is just a bernoulli trail. + * + * @param T The type of sample. + */ +public class BernoulliSamplerT extends RandomSamplerT { + + private final double fraction; + private final Random random; + + /** +* Create a bernoulli sampler sample fraction and default random number generator. +* +* @param fraction Sample fraction, aka the bernoulli sampler possibility. +*/ + public BernoulliSampler(double fraction) { + this(fraction, new Random()); + } + + /** +* Create a bernoulli sampler sample fraction and random number generator seed. +* +* @param fraction Sample fraction, aka the bernoulli sampler possibility. +* @param seed Random number generator seed. +*/ + public BernoulliSampler(double fraction, long seed) { + this(fraction, new Random(seed)); + } + + /** +* Create a bernoulli sampler sample fraction and random number generator. +* +* @param fraction Sample fraction, aka the bernoulli sampler possibility. +* @param random The random number generator. +*/ + public BernoulliSampler(double fraction, Random random) { + Preconditions.checkArgument(fraction = 0 fraction = 1.0d, fraction fraction must between [0, 1].); + this.fraction = fraction; + this.random = random; + } + + /** +* Sample the input elements, for each input element, take a Bernoulli Trail for sample. +* +* @param input Elements to be sampled. +* @return The sampled result which is lazy computed upon input elements. +*/ + @Override + public IteratorT sample(final IteratorT input) { + if (fraction == 0) { + return EMPTY_ITERABLE; + } + + return new SampledIteratorT() { + T current; + + @Override + public boolean hasNext() { + if (current == null) { + while (input.hasNext()) { + T element = input.next(); + if (random.nextDouble() = fraction) { + current = element; + return true; + } + } + current = null; + return false; + } else { + return true; + } + } + + @Override + public T next() { --- End diff -- It feels a bit counterintuitive that the next element is prepared in the `hasNext()` function. Doesn't this mean that `hasNext()` **needs** to be called every time before we call
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14695166#comment-14695166 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36969808 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/DataSet.java --- @@ -1057,7 +1061,68 @@ public long count() throws Exception { public UnionOperatorT union(DataSetT other){ return new UnionOperatorT(this, other, Utils.getCallLocationName()); } + + // + // Sample + // + + /** +* Generate a sample of DataSet by the probability fraction of each element. +* +* @param withReplacement Whether element can be selected more than once. +* @param fractionProbability that each element is chosen, should be [0,1] without replacement, +*and [0, ∞) with replacement. While fraction is larger than 1, the elements are +*expected to be selected multi times into sample on average. +* @return The sampled DataSet +*/ + public MapPartitionOperatorT, T sample(final boolean withReplacement, final double fraction) { + return sample(withReplacement, fraction, Utils.RNG.nextLong()); + } + + /** +* Generate a sample of DataSet by the probability fraction of each element. +* +* @param withReplacement Whether element can be selected more than once. +* @param fractionProbability that each element is chosen, should be [0,1] without replacement, +*and [0, ∞) with replacement. While fraction is larger than 1, the elements are +*expected to be selected multi times into sample on average. +* @param seedrandom number generator seed. +* @return The sampled DataSet +*/ + public MapPartitionOperatorT, T sample(final boolean withReplacement, final double fraction, final long seed) { + return mapPartition(new SampleWithFractionT(withReplacement, fraction, seed)); + } + + /** +* Generate a sample of DataSet which contains fixed size elements. +* +* @param withReplacement Whether element can be selected more than once. +* @param numSample The expected sample size. +* @return The sampled DataSet +*/ --- End diff -- Maybe we want to include a note that this kind of sampling currently takes 2 passes over the data, and recommend using fraction unless exact precision is necessary. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14696369#comment-14696369 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r37046195 --- Diff: flink-tests/src/test/scala/org/apache/flink/api/scala/operators/SampleITCase.scala --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.scala.operators + +import java.util.{List = JavaList, Random} + +import org.apache.flink.api.scala._ +import org.apache.flink.api.scala.util.CollectionDataSets +import org.apache.flink.test.util.MultipleProgramsTestBase.TestExecutionMode +import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} +import org.junit.Assert._ +import org.junit.runner.RunWith +import org.junit.runners.Parameterized +import org.junit.{Before, After, Test} + +import scala.collection.JavaConverters._ + +@RunWith(classOf[Parameterized]) +class SampleITCase(mode: TestExecutionMode) extends MultipleProgramsTestBase(mode) { + private val RNG: Random = new Random + + private var result: JavaList[String] = null; + + @Before + def initiate { +ExecutionEnvironment.getExecutionEnvironment.setParallelism(5) + } + + @After + def after() = { +TestBaseUtils.containsResultAsText(result, getSourceStrings) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithFractionWithoutReplacement { +verifySamplerWithFractionWithoutReplacement(0d) +verifySamplerWithFractionWithoutReplacement(0.2d) +verifySamplerWithFractionWithoutReplacement(1.0d) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithFractionWithReplacement { +verifySamplerWithFractionWithReplacement(0d) +verifySamplerWithFractionWithReplacement(0.2d) +verifySamplerWithFractionWithReplacement(1.0d) +verifySamplerWithFractionWithReplacement(2.0d) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithSizeWithoutReplacement { +verifySamplerWithFixedSizeWithoutReplacement(0) +verifySamplerWithFixedSizeWithoutReplacement(2) +verifySamplerWithFixedSizeWithoutReplacement(21) + } + + @Test + @throws(classOf[Exception]) + def testSamplerWithSizeWithReplacement { +verifySamplerWithFixedSizeWithReplacement(0) +verifySamplerWithFixedSizeWithReplacement(2) +verifySamplerWithFixedSizeWithReplacement(21) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithoutReplacement(fraction: Double) { +verifySamplerWithFractionWithoutReplacement(fraction, RNG.nextLong) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithoutReplacement(fraction: Double, seed: Long) { +verifySamplerWithFraction(false, fraction, seed) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithReplacement(fraction: Double) { +verifySamplerWithFractionWithReplacement(fraction, RNG.nextLong) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFractionWithReplacement(fraction: Double, seed: Long) { +verifySamplerWithFraction(true, fraction, seed) + } + + @throws(classOf[Exception]) + private def verifySamplerWithFraction(withReplacement: Boolean, fraction: Double, seed: Long) { +val ds = getSourceDataSet() +val sampled = ds.sample(withReplacement, fraction, seed) +result = sampled.collect.asJava --- End diff -- The validity of sample result is verified in after() method for each test. As the source data is very small, verify the fraction does not make much sense, so i didn't verify the fraction validity
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14694548#comment-14694548 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36934499 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/operators/util/RandomSamplerTest.java --- @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * This test suite try to verify whether all the random samplers work as we expected, which mainly focus on: + * ul + * liDoes sampled result fit into input parameters? we check parameters like sample fraction, sample size, + * w/o replacement, and so on./li + * liDoes sampled result randomly selected? we verify this by measure how much does it distributed on source data. + * Run Kolmogorov-Smirnov (KS) test between the random samplers and default reference samplers which is distributed + * well-proportioned on source data. If random sampler select elements randomly from source, it would distributed + * well-proportioned on source data as well. The KS test will fail to strongly reject the null hypothesis that + * the distributions of sampling gaps are the same. + * /li + * /ul + * + * @see a href=https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test;Kolmogorov Smirnov test/a + */ +public class RandomSamplerTest { + private final static int SOURCE_SIZE = 1; + private static KolmogorovSmirnovTest ksTest; + private static ListDouble source; + private final static int DEFFAULT_PARTITION_NUMBER=10; + private ListDouble[] sourcePartitions = new List[DEFFAULT_PARTITION_NUMBER]; + + @BeforeClass + public static void init() { + // initiate source data set. + source = new ArrayListDouble(SOURCE_SIZE); + for (int i = 0; i SOURCE_SIZE; i++) { + source.add((double) i); + } + + ksTest = new KolmogorovSmirnovTest(); + } + + private void initSourcePartition() { + for (int i=0; iDEFFAULT_PARTITION_NUMBER; i++) { + sourcePartitions[i] = new LinkedListDouble(); + } + for (int i = 0; i SOURCE_SIZE; i++) { + int index = i % DEFFAULT_PARTITION_NUMBER; + sourcePartitions[index].add((double)i); + } + } + + @Test(expected = java.lang.IllegalArgumentException.class) + public void testBernoulliSamplerWithUnexpectedFraction1() { + verifySamplerFraction(-1, false); + } + + @Test(expected = java.lang.IllegalArgumentException.class) + public void testBernoulliSamplerWithUnexpectedFraction2() { + verifySamplerFraction(2, false); + } + + @Test + public void testBernoulliSamplerFraction() { + verifySamplerFraction(0.01, false); + verifySamplerFraction(0.05, false); + verifySamplerFraction(0.1, false); + verifySamplerFraction(0.3, false); + verifySamplerFraction(0.5, false); + verifySamplerFraction(0.854, false); +
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14693288#comment-14693288 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36844879 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/operators/util/RandomSamplerTest.java --- @@ -0,0 +1,425 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +/** + * This test suite try to verify whether all the random samplers work as we expected, which mainly focus on: + * ul + * liDoes sampled result fit into input parameters? we check parameters like sample fraction, sample size, + * w/o replacement, and so on./li + * liDoes sampled result randomly selected? we verify this by measure how much does it distributed on source data. + * Run Kolmogorov-Smirnov (KS) test between the random samplers and default reference samplers which is distributed + * well-proportioned on source data. If random sampler select elements randomly from source, it would distributed + * well-proportioned on source data as well. The KS test will fail to strongly reject the null hypothesis that + * the distributions of sampling gaps are the same. + * /li + * /ul + * + * @see a href=https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test;Kolmogorov Smirnov test/a + */ +public class RandomSamplerTest { + private final static int SOURCE_SIZE = 1; + private static KolmogorovSmirnovTest ksTest; + private static ListDouble source; + private final static int DEFFAULT_PARTITION_NUMBER=10; + private ListDouble[] sourcePartitions = new List[DEFFAULT_PARTITION_NUMBER]; + + @BeforeClass + public static void init() { + // initiate source data set. + source = new ArrayListDouble(SOURCE_SIZE); + for (int i = 0; i SOURCE_SIZE; i++) { + source.add((double) i); + } + + ksTest = new KolmogorovSmirnovTest(); + } + + private void initSourcePartition() { + for (int i=0; iDEFFAULT_PARTITION_NUMBER; i++) { + sourcePartitions[i] = new LinkedListDouble(); + } + for (int i = 0; i SOURCE_SIZE; i++) { + int index = i % DEFFAULT_PARTITION_NUMBER; + sourcePartitions[index].add((double)i); + } + } + + @Test(expected = java.lang.IllegalArgumentException.class) + public void testBernoulliSamplerWithUnexpectedFraction1() { + verifySamplerFraction(-1, false); + } + + @Test(expected = java.lang.IllegalArgumentException.class) + public void testBernoulliSamplerWithUnexpectedFraction2() { + verifySamplerFraction(2, false); + } + + @Test + public void testBernoulliSamplerFraction() { + verifySamplerFraction(0.01, false); + verifySamplerFraction(0.05, false); + verifySamplerFraction(0.1, false); + verifySamplerFraction(0.3, false); + verifySamplerFraction(0.5, false); + verifySamplerFraction(0.854, false); +
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14692943#comment-14692943 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-130175707 Hi, @tillrohrmann , it's ready for review now. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative or exact size of the sample, set a seed for reproducibility, and support sampling within iterations. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14680223#comment-14680223 ] ASF GitHub Bot commented on FLINK-1901: --- Github user thvasilo commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-129481478 Hello @ChengXiangLi, perhaps looking at [this](http://machinelearning.wustl.edu/mlpapers/papers/icml2013_meng13a) paper may help with deciding which sampling algorithm to use when deciding for the exact sample size algorithm. It provides an implementation specifically designed for a MapReduce environment. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14681142#comment-14681142 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-129684685 Thanks, @thvasilo , that paper introduced an random sample algorithm which is an extend algorithm of the one i described before, it has two threshold the filter the element before sort, if element weight is bigger than up threshold, it would be included in final top K elements with very high possibility, if element weight is smaller than down threshold, it would not be included in final top K elements with very high possibility. With accepted possibility, we can filter the element with weigh larger than up threshold or smaller than down threshold, only sort the elements with weight between the thresholds. This is a very good algorithm, i would add it on my notebook for further improvement, but i don't want to implement it right way. This PR is large enough to me, so i would like to leave all the algorithms optimization in future, and just keep the basic implementations of sample algorithms here, make sure they are simple, easy to understand, work correctly, and they can be used as the performance base line for the further improvement. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14679955#comment-14679955 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-129409569 Thanks for the input, @tillrohrmann and @sachingoel0101 . I would like to implement the fixed size sampling with only one pass through source dataset, since while user try to sample a dataset, the dataset should be quite large in most cases, pass through the dataset multi times would add much more effort. In my solution, the basic idea of fixed size sample in distributed stream is that: generate a random number for each input elements as its weight, select top K elements with max weight, as the weights are generated randomly, so the selected top K elements are selected randomly. You can see more detail information in the code and javadoc. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14662173#comment-14662173 ] ASF GitHub Bot commented on FLINK-1901: --- Github user sachingoel0101 commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-128781613 I have worked on this problem before. The idea is to divide the data into blocks and find the probability of selection of an element from a block. Thus, suppose there are blocks B_1, B_2, ..., B_N with probabilities P_1, P_2, ..., P_N, then you sample k points by first sampling from the distribution {P_1, P_2, ..., P_N} and find the number of elements you require from each block. After that you select the required number of points from each block and take a union. It is pretty easy to implement in a shared memory system but with a distributed system, it is hard. I tried the following approach some time before, although didn't quite finish working on it: blockedData = Data - (block_id, data) blockNumbers = (block_id, data) - (block_id, count) (1...k) - (list of block ids we'll be sampling from) After this, I tried broadcasting the list and selecting the required number of elements from each block, which can be done quite easily. But what if k is very large? Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661768#comment-14661768 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-128690389 The current state with the `RichMapPartitionFunctions` looks good to me :+1: You're right that user usually want to fix the size for the whole sample. An easy solution could be to assign each item an index, see `DataSetUtils.zipWithIndex`. Then we can compute the maximum index (which is effectively counting the data set elements). This gives us the range from which have to sample. By generating a parallel sequence of the size of our sample size with `env.generateSequence(maxIndex)`, we could then sample from `[0, maxIndex]`. At last we would have to join this data set with the original data set which has the indices assigned. There are probably more efficient algorithms out there than this one. Just ping me when you've found a solution for the problem. Looking forward reviewing it :-) Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14661258#comment-14661258 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-128580607 Hi, @tillrohrmann , current implementation of sample with fixed size would generate fixed size sample for each partition randomly instead of the whole dataset, user may expect the later one actually most of the time. I'm research on how to sample fixed size elements randomly from distributed data stream, i think we can pause this PR review until i merge the previous fix. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14654757#comment-14654757 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36267430 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithReplacement.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.math3.distribution.PoissonDistribution; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + +/** + * A simple in memory implementation of Reservoir Sampling with replacement, and with only one pass through + * the input iteration whose size is unpredictable. + * This implementation refers to the algorithm described in Reservoir-based Random Sampling with Replacement + * from Data Stream. + * + * @param T the type of sample. + */ +public class ReservoirSamplerWithReplacementT extends RandomSamplerT { + private final int numSamples; + private final Random random; + private PoissonDistribution poissonDistribution; + private ListInteger positions; + + /** +* Create a reservoir sampler with fixed sample size and default random number generator. +* +* @param numSamples number of samples to retain in reservoir, must be non-negative. +*/ + public ReservoirSamplerWithReplacement(int numSamples) { + this(numSamples, new Random()); + } + + /** +* Create a reservoir sampler with fixed sample size and random number generator seed. +* +* @param numSamples number of samples to retain in reservoir, must be non-negative. +* @param seed random number generator seed +*/ + public ReservoirSamplerWithReplacement(int numSamples, long seed) { + this(numSamples, new Random(seed)); + } + + /** +* Create a reservoir sampler with fixed sample size and random number generator. +* +* @param numSamples number of samples to retain in reservoir, must be non-negative. --- End diff -- Fixed. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14654760#comment-14654760 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r36267541 --- Diff: pom.xml --- @@ -224,6 +224,12 @@ under the License. version3.2.1/version /dependency + dependency + groupIdorg.apache.commons/groupId + artifactIdcommons-math3/artifactId + version3.5/version + /dependency --- End diff -- I didn't found flink-dist/NOTICE and flink-dist/LICENSE files, there are NOTICE and LICENSE in flink root directory, but i didn't found the right place to add something related to commons-math3, so i would just leave this to you, Till. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649351#comment-14649351 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35984921 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/RandomSampler.java --- @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import java.util.Iterator; + +public abstract class RandomSamplerT { --- End diff -- JavaDocs are missing Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649332#comment-14649332 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35984232 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/BernoulliSampler.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; + +import java.util.Iterator; +import java.util.Random; + +/** + * A sampler implementation built upon Bernoulli Trail. For sample without replacement, each element sample choice is just a bernoulli trail. + * + * @param T The type of sample. + */ +public class BernoulliSamplerT extends RandomSamplerT { + + private final double fraction; + private final Random random; + + /** +* Create a bernoulli sampler sample fraction and default random number generator. +* +* @param fraction sample fraction, aka the bernoulli sampler possibility. +*/ + public BernoulliSampler(double fraction) { + this(fraction, new Random()); + } + + /** +* Create a bernoulli sampler sample fraction and random number generator seed. +* +* @param fraction sample fraction, aka the bernoulli sampler possibility. +* @param seed random number generator seed. +*/ + public BernoulliSampler(double fraction, long seed) { + this(fraction, new Random(seed)); + } + + /** +* Create a bernoulli sampler sample fraction and random number generator. +* +* @param fraction sample fraction, aka the bernoulli sampler possibility. +* @param random the random number generator. +*/ + public BernoulliSampler(double fraction, Random random) { + Preconditions.checkArgument(fraction = 0 fraction = 1.0d, fraction fraction must between [0, 1].); + this.fraction = fraction; + this.random = random; + } + + /** +* Sample the input elements, for each input element, take a Bernoulli Trail for sample. +* +* @param input elements to be sampled. +* @return the sampled result which is lazy computed upon input elements. +*/ + @Override + public IteratorT sample(final IteratorT input) { + if (fraction == 0) { + return EMPTY_ITERABLE; + } + + return new SampledIteratorT() { + T current; + + @Override + public boolean hasNext() { + if (current == null) { + while (input.hasNext()) { + T element = input.next(); + if (random.nextDouble() = fraction) { + current = element; + return true; + } + } + current = null; + return false; + } + return false; --- End diff -- I think, if I'm not mistaken, that `hasNext` has to be idempotent. Thus it should return `true` if `current != null`. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components:
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649419#comment-14649419 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35988629 --- Diff: pom.xml --- @@ -224,6 +224,12 @@ under the License. version3.2.1/version /dependency + dependency + groupIdorg.apache.commons/groupId + artifactIdcommons-math3/artifactId + version3.5/version + /dependency --- End diff -- For that we have to add an entry in the `flink-dist/NOTICE` and `flink-dist/LICENSE` files. But I can do that when merging the PR. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649341#comment-14649341 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35984449 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/PoissonSampler.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.math3.distribution.PoissonDistribution; + +import java.util.Iterator; + +/** + * A sampler implementation based on Poisson Distribution. While sample elements with replacement, + * the picked number of each element follow poisson distribution, so we could use poisson distribution --- End diff -- Typo: element follows a given poisson distribution Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649337#comment-14649337 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35984385 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/PoissonSampler.java --- @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.math3.distribution.PoissonDistribution; + +import java.util.Iterator; + +/** + * A sampler implementation based on Poisson Distribution. While sample elements with replacement, --- End diff -- While sampling elements Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649372#comment-14649372 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35985704 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithoutReplacement.java --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Random; + +/** + * A simple in memory implementation of Reservoir Sampling without replacement, and with only one pass through + * the input iteration whose size is unpredictable. + * This implementation refers to the Algorithm R described in Random Sampling with a Reservoir Vitter, 1985. --- End diff -- Maybe add again a link to the paper if available. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649358#comment-14649358 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35985033 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithReplacement.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.math3.distribution.PoissonDistribution; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + +/** + * A simple in memory implementation of Reservoir Sampling with replacement, and with only one pass through + * the input iteration whose size is unpredictable. + * This implementation refers to the algorithm described in Reservoir-based Random Sampling with Replacement + * from Data Stream. --- End diff -- Maybe put a link to the paper in here. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649360#comment-14649360 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/949#discussion_r35985116 --- Diff: flink-core/src/main/java/org/apache/flink/api/common/operators/util/ReservoirSamplerWithReplacement.java --- @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.api.common.operators.util; + +import com.google.common.base.Preconditions; +import org.apache.commons.math3.distribution.PoissonDistribution; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; + +/** + * A simple in memory implementation of Reservoir Sampling with replacement, and with only one pass through + * the input iteration whose size is unpredictable. + * This implementation refers to the algorithm described in Reservoir-based Random Sampling with Replacement + * from Data Stream. + * + * @param T the type of sample. + */ +public class ReservoirSamplerWithReplacementT extends RandomSamplerT { + private final int numSamples; + private final Random random; + private PoissonDistribution poissonDistribution; + private ListInteger positions; + + /** +* Create a reservoir sampler with fixed sample size and default random number generator. +* +* @param numSamples number of samples to retain in reservoir, must be non-negative. +*/ + public ReservoirSamplerWithReplacement(int numSamples) { + this(numSamples, new Random()); + } + + /** +* Create a reservoir sampler with fixed sample size and random number generator seed. +* +* @param numSamples number of samples to retain in reservoir, must be non-negative. +* @param seed random number generator seed +*/ + public ReservoirSamplerWithReplacement(int numSamples, long seed) { + this(numSamples, new Random(seed)); + } + + /** +* Create a reservoir sampler with fixed sample size and random number generator. +* +* @param numSamples number of samples to retain in reservoir, must be non-negative. --- End diff -- @param is missing Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14649429#comment-14649429 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-126740651 Thanks for your contribution @ChengXiangLi. The code is really well tested and well structured. Great work :-) I had only some minor comments. There is however one thing I'm not so sure about. With the current implementation, all parallel tasks of the sampling operator will get the same random generator/seed value. Thus, every node will generate the same sequence of random numbers. I think this can have a negative influence on the sampling. What we could do is to use `RichMapPartitionFunction` instead of the `MapPartitionFunction`. With the rich function, we either have access to the subtask index, given by `getRuntimeContext().getIndexOfThisSubtask()`, which we could use to modify the initial seed or we generate the random number generator in the `open` method (this method is executed on the TaskManager). Assuming that the clocks are not completely synchronized and that the individual tasks will be instantiated not at the same time, this could give us less correlated random number sequences. What do you think? Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14647517#comment-14647517 ] ASF GitHub Bot commented on FLINK-1901: --- Github user tillrohrmann commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-126296058 Thanks for your contribution @ChengXiangLi. I'll try to review your PR tomorrow morning. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14645658#comment-14645658 ] ASF GitHub Bot commented on FLINK-1901: --- GitHub user ChengXiangLi opened a pull request: https://github.com/apache/flink/pull/949 [FLINK-1901] [core] Create sample operator for Dataset. This PR includes: 1. 4 random sampler implementation for different sample strategies. 2. sample operator for DataSet Java API. 3. random sampler unit test. 4. sample operator Java API integration test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/ChengXiangLi/flink FLINK-1901 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/949.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #949 commit f7ba8779b8d6a6d66ab5d4e2435a70e220b1e0fc Author: chengxiang li chengxiang...@intel.com Date: 2015-07-22T03:38:13Z [FLINK-1901] [core] Create sample operator for Dataset. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14645803#comment-14645803 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-125898830 Previously, i plan to leave the sample scala API to an separate PR as i not very familiar with scala, but the failed test shows that Flink has a test to make sure scala and java has the same API, i would try to add scala API and integration test later. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14647192#comment-14647192 ] ASF GitHub Bot commented on FLINK-1901: --- Github user ChengXiangLi commented on the pull request: https://github.com/apache/flink/pull/949#issuecomment-126184403 scala API and it's integration test has been merged into latest commit. @tillrohrmann do you have time to review this PR? Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638272#comment-14638272 ] Chengxiang Li commented on FLINK-1901: -- Thanks,[~till.rohrmann], i got it now. This more like a iteration optimization issue to me, it assumes that the output of static code path would always be the same, so it cached the output for potential performance improvement, but this assumption is not always true, for example, static code path with random sampling operator, data source read from HBase, and so on. I think we could open a separate JIRA to address it in a uniform way instead of taking random sampling as a special case in this JIRA. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638429#comment-14638429 ] Till Rohrmann commented on FLINK-1901: -- That's a good idea to break down the task. Do you want to take the lead [~chengxiang li]? Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638448#comment-14638448 ] Till Rohrmann commented on FLINK-1901: -- Currently, whats happening to decide whether an operator is on a dynamic path or not is to look at the inputs of the operator. If they are dynamic so is the current operator. The iteration {{DataSets}}, {{WorksetPlaceHolder}}, {{SolutionSetPlaceHolder}} and {{PartialSolutionPlaceHolder}}, are always dynamic. What could be an idea is to allow other operators also to be declared dynamic. That way they can also start dynamic path. Afterwards, we have to make sure that not only the iteration {{DataSets}} get a {{IterationHead}} prepended, which kicks off the iterations, but also all the other operators which start a dynamic path. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14638617#comment-14638617 ] Chengxiang Li commented on FLINK-1901: -- Thanks for the analysis, [~trohrm...@apache.org], I've created FLINK-2396 as a followup work to support sample and other similar datasets in iteration. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636451#comment-14636451 ] Till Rohrmann commented on FLINK-1901: -- If you use the sampling operator this way, it works. However, usually your iteration data set is something like the weight vector of your model and you have another training dataset from which you want to take a small sample to update your weight vector in each iteration (e.g. SGD). When you write a program like that, then you'll see that the output of the sampling operator will always be the same (for every iteration). The reason is that the sampling no longer is on the dynamic path of the iteration and thus it is only once calculated and then cached. This is not the intended behaviour, though. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14635206#comment-14635206 ] Till Rohrmann commented on FLINK-1901: -- I think this solution is indeed a little bit too hacky. It would be very unintuitive for the user having to broadcast the iteration {{DataSet}} to the sampling operator. Furthermore, this will inflict unnecessary network I/O. I think we should try to solve this problem properly. This means that we have a single sampling operator which works inside and outside of iterations. This will also avoid code duplication. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636124#comment-14636124 ] Chengxiang Li commented on FLINK-1901: -- every point is sampled with probability 1/N is one of the sampling case(sampling with fraction, without replacement), there are 3 others kind of sampling case which is normally used as well, like sampling with fraction, with replacement, sampling with fixed size, without replacement and sampling with fixed size, with replacement. We should support all of them while expose a sampling operator to user. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14636332#comment-14636332 ] Chengxiang Li commented on FLINK-1901: -- I write a simple example of sampling operator at [here|https://github.com/ChengXiangLi/flink/blob/FLINK-1901/flink-examples/flink-java-examples/src/main/java/org/apache/flink/examples/java/wordcount/TestSample.java], it works just as expected inside or outside of iteration(for example, 1000 items, sample fraction 0.5, after 3 iterations, output contains around 125 items), [~trohrm...@apache.org], i'm not sure whether i understand you correctly about sampling inside iteration, it looks same to me if it's the case in the example. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634622#comment-14634622 ] Chengxiang Li commented on FLINK-1901: -- To randomly choose a sample from a DataSet S, basically, there exists two kinds of sample requirement: sampling with factor(such as randomly choose 5% percent items in S) and sampling with fixed size(such as randomly choose 100 items from S). Besides, we do not know the size of S, unless we take extra cost to computer it through DataSet::count(). # Sampling with factor #* With replacement: the expected sample size follow [Poisson Distribution|https://en.wikipedia.org/wiki/Poisson_distribution] in this case, so Poisson Sampling can be used to choose the sample items. #* Without replacement: during sampling, we can take the sample of each item in iterator as a [Bernoulli Trial|https://en.wikipedia.org/wiki/Bernoulli_trial]. # Sampling with fixed size #* Use DataSet::count() to get the dataset size, with the fixed size, we can turn this into sampling with factor. #* [Reservoir Sampling|https://en.wikipedia.org/wiki/Reservoir_sampling] is another commonly used algorithm to randomly choose a sample of k items from a list S containing n items, where n is either a very large or unknown number, and there are different reservoir sampling algorithms that support reservoir support both sampling with replacement and sampling without replacement. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634664#comment-14634664 ] Till Rohrmann commented on FLINK-1901: -- Hi Chengxiang, good to hear that you want to work in this. I can assign you the ticket. However, it is not only about the sampling strategy but also about the integration within Flink. The reason is that we have to make sure that the sampling operator also works within iterations. This means that it has to be part of the dynamic path so that it is triggered for every iteration again and again. This will need a special operator type. But you can start with the sampling strategies and then continue with the iteration integration. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634800#comment-14634800 ] Till Rohrmann commented on FLINK-1901: -- Oh I forgot. Sorry. What about the iteration support? Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634797#comment-14634797 ] Sachin Goel commented on FLINK-1901: [~trohrm...@apache.org], this is already a part of the k-means initialization schemes [Most of it anyway]. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634826#comment-14634826 ] Till Rohrmann commented on FLINK-1901: -- To be honest, I doubt that the sampling is executed repeatedly if it's not the iteration data set from which you're sampling. If you use map and reduce operations which lie on the static path, then the results will be executed once and cached. But best you check the samples. If it is possible to create a separate PR out of it, then it would be great. Makes reviewing much easier. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634802#comment-14634802 ] Sachin Goel commented on FLINK-1901: I didn't think there would be any such issue since the sampling itself uses Map and Reduce operations. I can check if there are any issues. It certainly gave really good results, which wouldn't happen if it weren't being executed repeatedly. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634822#comment-14634822 ] Chengxiang Li commented on FLINK-1901: -- Hi, [~sachingoel0101], I didn't find any related class about sampling while search the project with the keyword, is the PR you mentioned ongoing now? Besides ML algorithms, there should be other use case depends on sampling operation, such as range partition, and i believe sample operation itself is a common operation which may be used directly by user. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14634628#comment-14634628 ] Chengxiang Li commented on FLINK-1901: -- Hi, [~tvas], i would like to contribute on this issue if there is no others working on it now. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14635102#comment-14635102 ] Sachin Goel commented on FLINK-1901: Yes. I do agree. I'll see if what I've written can work with iterations out-of-box. Fingers crossed. And I meant, in random sampling, every point is sampled with probability 1/N. I was wondering about cases when this probability is not constant across all elements, instead is a function of the element itself. I guess that's too specific to be a feature in itself. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14635082#comment-14635082 ] Sachin Goel commented on FLINK-1901: Okay. So I checked the whole code and well, the random sampling I'm using there is never used inside an iteration. So as far as that goes, there are no problems. However, it would certainly be good to have a separate random sampling module, which can work on any data set, for that matter. [~trohrm...@apache.org], do you think there is any utility for a sampling procedure different from random? That is, suppose there is a function which maps every element in the dataset to its probability of selection. [~chengxiang li], yes. There is an ongoing PR (https://github.com/apache/flink/pull/757). And yes. It would certainly make sense to have a generic sample function. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14635096#comment-14635096 ] Till Rohrmann commented on FLINK-1901: -- The problem is that a sampling operator should also work within iterations. There is definitely a big need for this, e.g. for stochastic gradient descent. I don't really understand what you mean with your question [~sachingoel0101]. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis Assignee: Chengxiang Li In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-1901) Create sample operator for Dataset
[ https://issues.apache.org/jira/browse/FLINK-1901?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14499396#comment-14499396 ] Till Rohrmann commented on FLINK-1901: -- A sampling operator for non-iterations can be realized as a mapPartition operator. This would also allow us to do sampling without replacement. The harder problem is to do sampling within an iteration. Usually, one would sample from a static code path which is only executed once. Maybe we can include the sampling operator on the dynamic code path to trigger it every iteration. Create sample operator for Dataset -- Key: FLINK-1901 URL: https://issues.apache.org/jira/browse/FLINK-1901 Project: Flink Issue Type: Improvement Components: Core Reporter: Theodore Vasiloudis In order to be able to implement Stochastic Gradient Descent and a number of other machine learning algorithms we need to have a way to take a random sample from a Dataset. We need to be able to sample with or without replacement from the Dataset, choose the relative size of the sample, and set a seed for reproducibility. -- This message was sent by Atlassian JIRA (v6.3.4#6332)