Repository: flink Updated Branches: refs/heads/master 65ee28c34 -> 297d75c2e
[FLINK-2832] [tests] Hardens RandomSamplerTest Increase the level of significance from p=0.01 to p=0.001 and add retry annotations to random sampler tests. This should decrease the probability of failing random sampler tests. This closes #2076 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/297d75c2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/297d75c2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/297d75c2 Branch: refs/heads/master Commit: 297d75c2e043026ccc3744d587c9ebbbd81e7d4b Parents: e1b55f0 Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Jun 6 16:19:30 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Jun 8 15:17:10 2016 +0200 ---------------------------------------------------------------------- .../api/java/sampling/RandomSamplerTest.java | 77 ++++++++++++-------- .../src/test/resources/log4j-test.properties | 28 +++++++ flink-java/src/test/resources/logback-test.xml | 35 +++++++++ 3 files changed, 111 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/297d75c2/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java ---------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java b/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java index 68f9154..228dd3a 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/sampling/RandomSamplerTest.java @@ -20,14 +20,16 @@ package org.apache.flink.api.java.sampling; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest; +import org.apache.flink.testutils.junit.RetryOnFailure; +import org.apache.flink.testutils.junit.RetryRule; import org.apache.flink.util.Preconditions; import org.junit.BeforeClass; +import org.junit.Rule; import org.junit.Test; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Set; @@ -50,29 +52,37 @@ import static org.junit.Assert.assertTrue; * @see <a href="https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test">Kolmogorov Smirnov test</a> */ public class RandomSamplerTest { - private final static int SOURCE_SIZE = 10000; - private static KolmogorovSmirnovTest ksTest; - private static List<Double> source; - private final static int DEFFAULT_PARTITION_NUMBER=10; - private List<Double>[] sourcePartitions = new List[DEFFAULT_PARTITION_NUMBER]; + + private static final int SOURCE_SIZE = 10000; + + private static final int DEFAULT_PARTITION_NUMBER = 10; + + private static final KolmogorovSmirnovTest ksTest = new KolmogorovSmirnovTest(); + + private static final List<Double> source = new ArrayList<Double>(SOURCE_SIZE); + + + @Rule + public final RetryRule retryRule = new RetryRule(); + + @SuppressWarnings({"unchecked", "rawtypes"}) + private final List<Double>[] sourcePartitions = new List[DEFAULT_PARTITION_NUMBER]; + @BeforeClass public static void init() { // initiate source data set. - source = new ArrayList<Double>(SOURCE_SIZE); for (int i = 0; i < SOURCE_SIZE; i++) { source.add((double) i); } - - ksTest = new KolmogorovSmirnovTest(); } private void initSourcePartition() { - for (int i=0; i<DEFFAULT_PARTITION_NUMBER; i++) { - sourcePartitions[i] = new LinkedList<Double>(); + for (int i = 0; i< DEFAULT_PARTITION_NUMBER; i++) { + sourcePartitions[i] = new ArrayList<Double>((int)Math.ceil((double)SOURCE_SIZE / DEFAULT_PARTITION_NUMBER)); } for (int i = 0; i< SOURCE_SIZE; i++) { - int index = i % DEFFAULT_PARTITION_NUMBER; + int index = i % DEFAULT_PARTITION_NUMBER; sourcePartitions[index].add((double)i); } } @@ -88,6 +98,7 @@ public class RandomSamplerTest { } @Test + @RetryOnFailure(times=3) public void testBernoulliSamplerFraction() { verifySamplerFraction(0.01, false); verifySamplerFraction(0.05, false); @@ -99,6 +110,7 @@ public class RandomSamplerTest { } @Test + @RetryOnFailure(times=3) public void testBernoulliSamplerDuplicateElements() { verifyRandomSamplerDuplicateElements(new BernoulliSampler<Double>(0.01)); verifyRandomSamplerDuplicateElements(new BernoulliSampler<Double>(0.1)); @@ -111,6 +123,7 @@ public class RandomSamplerTest { } @Test + @RetryOnFailure(times=3) public void testPoissonSamplerFraction() { verifySamplerFraction(0.01, true); verifySamplerFraction(0.05, true); @@ -132,6 +145,7 @@ public class RandomSamplerTest { } @Test + @RetryOnFailure(times=3) public void testBernoulliSamplerDistribution() { verifyBernoulliSampler(0.01d); verifyBernoulliSampler(0.05d); @@ -140,6 +154,7 @@ public class RandomSamplerTest { } @Test + @RetryOnFailure(times=3) public void testPoissonSamplerDistribution() { verifyPoissonSampler(0.01d); verifyPoissonSampler(0.05d); @@ -148,6 +163,7 @@ public class RandomSamplerTest { } @Test + @RetryOnFailure(times=3) public void testReservoirSamplerSampledSize() { verifySamplerFixedSampleSize(1, true); verifySamplerFixedSampleSize(10, true); @@ -164,6 +180,7 @@ public class RandomSamplerTest { } @Test + @RetryOnFailure(times=3) public void testReservoirSamplerSampledSize2() { RandomSampler<Double> sampler = new ReservoirSamplerWithoutReplacement<Double>(20000); Iterator<Double> sampled = sampler.sample(source.iterator()); @@ -171,6 +188,7 @@ public class RandomSamplerTest { } @Test + @RetryOnFailure(times=3) public void testReservoirSamplerDuplicateElements() { verifyRandomSamplerDuplicateElements(new ReservoirSamplerWithoutReplacement<Double>(100)); verifyRandomSamplerDuplicateElements(new ReservoirSamplerWithoutReplacement<Double>(1000)); @@ -178,6 +196,7 @@ public class RandomSamplerTest { } @Test + @RetryOnFailure(times=3) public void testReservoirSamplerWithoutReplacement() { verifyReservoirSamplerWithoutReplacement(100, false); verifyReservoirSamplerWithoutReplacement(500, false); @@ -186,6 +205,7 @@ public class RandomSamplerTest { } @Test + @RetryOnFailure(times=3) public void testReservoirSamplerWithReplacement() { verifyReservoirSamplerWithReplacement(100, false); verifyReservoirSamplerWithReplacement(500, false); @@ -194,6 +214,7 @@ public class RandomSamplerTest { } @Test + @RetryOnFailure(times=3) public void testReservoirSamplerWithMultiSourcePartitions1() { initSourcePartition(); @@ -204,6 +225,7 @@ public class RandomSamplerTest { } @Test + @RetryOnFailure(times=3) public void testReservoirSamplerWithMultiSourcePartitions2() { initSourcePartition(); @@ -262,7 +284,7 @@ public class RandomSamplerTest { assertTrue("There should not have duplicate element for sampler without replacement.", list.size() == set.size()); } - private int getSize(Iterator iterator) { + private int getSize(Iterator<?> iterator) { int size = 0; while (iterator.hasNext()) { iterator.next(); @@ -301,7 +323,7 @@ public class RandomSamplerTest { * If random sampler select elements randomly from source, it would distributed well-proportioned on source data as well, * so the K-S Test result would accept the first one, while reject the second one. */ - private void verifyRandomSamplerWithFraction(double fraction, RandomSampler sampler, boolean withDefaultSampler) { + private void verifyRandomSamplerWithFraction(double fraction, RandomSampler<Double> sampler, boolean withDefaultSampler) { double[] baseSample; if (withDefaultSampler) { baseSample = getDefaultSampler(fraction); @@ -318,7 +340,7 @@ public class RandomSamplerTest { * If random sampler select elements randomly from source, it would distributed well-proportioned on source data as well, * so the K-S Test result would accept the first one, while reject the second one. */ - private void verifyRandomSamplerWithSampleSize(int sampleSize, RandomSampler sampler, boolean withDefaultSampler, boolean sampleWithPartitions) { + private void verifyRandomSamplerWithSampleSize(int sampleSize, RandomSampler<Double> sampler, boolean withDefaultSampler, boolean sampleWithPartitions) { double[] baseSample; if (withDefaultSampler) { baseSample = getDefaultSampler(sampleSize); @@ -329,11 +351,11 @@ public class RandomSamplerTest { verifyKSTest(sampler, baseSample, withDefaultSampler, sampleWithPartitions); } - private void verifyKSTest(RandomSampler sampler, double[] defaultSampler, boolean expectSuccess) { + private void verifyKSTest(RandomSampler<Double> sampler, double[] defaultSampler, boolean expectSuccess) { verifyKSTest(sampler, defaultSampler, expectSuccess, false); } - private void verifyKSTest(RandomSampler sampler, double[] defaultSampler, boolean expectSuccess, boolean sampleOnPartitions) { + private void verifyKSTest(RandomSampler<Double> sampler, double[] defaultSampler, boolean expectSuccess, boolean sampleOnPartitions) { double[] sampled = getSampledOutput(sampler, sampleOnPartitions); double pValue = ksTest.kolmogorovSmirnovStatistic(sampled, defaultSampler); double dValue = getDValue(sampled.length, defaultSampler.length); @@ -345,11 +367,11 @@ public class RandomSamplerTest { } private double[] getSampledOutput(RandomSampler<Double> sampler, boolean sampleOnPartitions) { - Iterator<Double> sampled = null; + Iterator<Double> sampled; if (sampleOnPartitions) { DistributedRandomSampler<Double> reservoirRandomSampler = (DistributedRandomSampler<Double>)sampler; List<IntermediateSampleData<Double>> intermediateResult = Lists.newLinkedList(); - for (int i=0; i<DEFFAULT_PARTITION_NUMBER; i++) { + for (int i = 0; i< DEFAULT_PARTITION_NUMBER; i++) { Iterator<IntermediateSampleData<Double>> partialIntermediateResult = reservoirRandomSampler.sampleInPartition(sourcePartitions[i].iterator()); while (partialIntermediateResult.hasNext()) { intermediateResult.add(partialIntermediateResult.next()); @@ -363,8 +385,7 @@ public class RandomSamplerTest { while (sampled.hasNext()) { list.add(sampled.next()); } - double[] result = transferFromListToArrayWithOrder(list); - return result; + return transferFromListToArrayWithOrder(list); } /* @@ -393,10 +414,9 @@ public class RandomSamplerTest { private double[] getDefaultSampler(int fixSize) { Preconditions.checkArgument(fixSize > 0, "Sample fraction should be positive."); - int size = fixSize; double step = SOURCE_SIZE / (double) fixSize; - double[] defaultSampler = new double[size]; - for (int i = 0; i < size; i++) { + double[] defaultSampler = new double[fixSize]; + for (int i = 0; i < fixSize; i++) { defaultSampler[i] = Math.round(step * i); } @@ -424,9 +444,8 @@ public class RandomSamplerTest { private double[] getWrongSampler(int fixSize) { Preconditions.checkArgument(fixSize > 0, "Sample size be positive."); int halfSourceSize = SOURCE_SIZE / 2; - int size = fixSize; - double[] wrongSampler = new double[size]; - for (int i = 0; i < size; i++) { + double[] wrongSampler = new double[fixSize]; + for (int i = 0; i < fixSize; i++) { wrongSampler[i] = (double) i % halfSourceSize; } @@ -434,13 +453,13 @@ public class RandomSamplerTest { } /* - * Calculate the D value of K-S test for p-value 0.01, m and n are the sample size + * Calculate the D value of K-S test for p-value 0.001, m and n are the sample size */ private double getDValue(int m, int n) { Preconditions.checkArgument(m > 0, "input sample size should be positive."); Preconditions.checkArgument(n > 0, "input sample size should be positive."); double first = (double) m; double second = (double) n; - return 1.63 * Math.sqrt((first + second) / (first * second)); + return 1.95 * Math.sqrt((first + second) / (first * second)); } } http://git-wip-us.apache.org/repos/asf/flink/blob/297d75c2/flink-java/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-java/src/test/resources/log4j-test.properties b/flink-java/src/test/resources/log4j-test.properties new file mode 100644 index 0000000..a0fa730 --- /dev/null +++ b/flink-java/src/test/resources/log4j-test.properties @@ -0,0 +1,28 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# Set root logger level to OFF to not flood build logs +# set manually to INFO for debugging purposes +log4j.rootLogger=OFF, testlogger + +# A1 is set to be a ConsoleAppender. +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target=System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + http://git-wip-us.apache.org/repos/asf/flink/blob/297d75c2/flink-java/src/test/resources/logback-test.xml ---------------------------------------------------------------------- diff --git a/flink-java/src/test/resources/logback-test.xml b/flink-java/src/test/resources/logback-test.xml new file mode 100644 index 0000000..159f76c --- /dev/null +++ b/flink-java/src/test/resources/logback-test.xml @@ -0,0 +1,35 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<configuration> + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> + + <logger name="org.apache.flink.api.common.io.DelimitedInputFormat" level="OFF"/> + <logger name="org.apache.flink.api.common.io.FileInputFormat" level="OFF"/> + <logger name="org.apache.flink.configuration.GlobalConfiguration" level="OFF"/> + <logger name="org.apache.flink.configuration.Configuration" level="OFF"/> +</configuration> +