[BEAM-1186] Broke SampleTest into 2 test classes that support TestPipeline as a JUnit rule.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9faa5aba Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9faa5aba Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9faa5aba Branch: refs/heads/master Commit: 9faa5abae4a58eb52efe37f3c29d691f016c2595 Parents: 3aaa1e3 Author: Stas Levin <stasle...@gmail.com> Authored: Wed Dec 21 23:21:11 2016 +0200 Committer: Luke Cwik <lc...@google.com> Committed: Wed Dec 28 11:40:32 2016 -0800 ---------------------------------------------------------------------- .../apache/beam/sdk/transforms/SampleTest.java | 388 ++++++++++--------- 1 file changed, 211 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9faa5aba/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java index 9cc12d4..4e3b31c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SampleTest.java @@ -19,24 +19,22 @@ package org.apache.beam.sdk.transforms; import static com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.sdk.TestUtils.LINES; -import static org.apache.beam.sdk.TestUtils.NO_LINES; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.TreeSet; - -import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.BigEndianIntegerCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.testing.NeedsRunner; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; @@ -47,228 +45,264 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; +import org.junit.runners.Suite; /** * Tests for Sample transform. */ -@RunWith(JUnit4.class) +@RunWith(Suite.class) +@Suite.SuiteClasses({ + SampleTest.PickAnyTest.class, + SampleTest.MiscTest.class +}) public class SampleTest { - static final Integer[] EMPTY = new Integer[] { }; - static final Integer[] DATA = new Integer[] {1, 2, 3, 4, 5}; - static final Integer[] REPEATED_DATA = new Integer[] {1, 1, 2, 2, 3, 3, 4, 4, 5, 5}; + private static final Integer[] EMPTY = new Integer[] { }; + private static final Integer[] DATA = new Integer[] {1, 2, 3, 4, 5}; + private static final Integer[] REPEATED_DATA = new Integer[] {1, 1, 2, 2, 3, 3, 4, 4, 5, 5}; /** - * Verifies that the result of a Sample operation contains the expected number of elements, - * and that those elements are a subset of the items in expected. + * Test variations for Sample transform. */ - @SuppressWarnings("rawtypes") - public static class VerifyCorrectSample<T extends Comparable> - implements SerializableFunction<Iterable<T>, Void> { - private T[] expectedValues; - private int expectedSize; - - /** - * expectedSize is the number of elements that the Sample should contain. expected is the set - * of elements that the sample may contain. - */ - @SafeVarargs - VerifyCorrectSample(int expectedSize, T... expected) { - this.expectedValues = expected; - this.expectedSize = expectedSize; + @RunWith(Parameterized.class) + public static class PickAnyTest { + @Rule + public final transient TestPipeline p = TestPipeline.create(); + + @Parameterized.Parameters(name = "limit_{0}") + public static Iterable<Object[]> data() throws IOException { + return ImmutableList.<Object[]>builder() + .add( + new Object[] { + 0 + }, + new Object[] { + 1 + }, + new Object[] { + LINES.size() / 2 + }, + new Object[] { + LINES.size() * 2 + }, + new Object[] { + LINES.size() - 1 + }, + new Object[] { + LINES.size() + }, + new Object[] { + LINES.size() + 1 + } + ) + .build(); } - @Override - @SuppressWarnings("unchecked") - public Void apply(Iterable<T> in) { - List<T> actual = new ArrayList<>(); - for (T elem : in) { - actual.add(elem); + @Parameterized.Parameter + public int limit; + + private static class VerifyAnySample implements SerializableFunction<Iterable<String>, Void> { + private final List<String> lines; + private final int limit; + private VerifyAnySample(List<String> lines, int limit) { + this.lines = lines; + this.limit = limit; } - assertEquals(expectedSize, actual.size()); + @Override + public Void apply(Iterable<String> actualIter) { + final int expectedSize = Math.min(limit, lines.size()); - Collections.sort(actual); // We assume that @expected is already sorted. - int i = 0; // Index into @expected - for (T s : actual) { - boolean matchFound = false; - for (; i < expectedValues.length; i++) { - if (s.equals(expectedValues[i])) { - matchFound = true; - break; - } + // Make sure actual is the right length, and is a + // subset of expected. + List<String> actual = new ArrayList<>(); + for (String s : actualIter) { + actual.add(s); } - assertTrue("Invalid sample: " + Joiner.on(',').join(actual), matchFound); - i++; // Don't match the same element again. + assertEquals(expectedSize, actual.size()); + Set<String> actualAsSet = new TreeSet<>(actual); + Set<String> linesAsSet = new TreeSet<>(lines); + assertEquals(actual.size(), actualAsSet.size()); + assertEquals(lines.size(), linesAsSet.size()); + assertTrue(linesAsSet.containsAll(actualAsSet)); + return null; } - return null; } - } - @Rule - public final transient TestPipeline pipeline = TestPipeline.create(); + void runPickAnyTest(final List<String> lines, int limit) { + checkArgument(new HashSet<String>(lines).size() == lines.size(), + "Duplicates are unsupported."); - @Test - @Category(RunnableOnService.class) - public void testSample() { + PCollection<String> input = p.apply(Create.of(lines) + .withCoder(StringUtf8Coder.of())); - PCollection<Integer> input = pipeline.apply(Create.of(DATA) - .withCoder(BigEndianIntegerCoder.of())); - PCollection<Iterable<Integer>> output = input.apply( - Sample.<Integer>fixedSizeGlobally(3)); + PCollection<String> output = + input.apply(Sample.<String>any(limit)); - PAssert.thatSingletonIterable(output) - .satisfies(new VerifyCorrectSample<>(3, DATA)); - pipeline.run(); - } - @Test - @Category(RunnableOnService.class) - public void testSampleEmpty() { + PAssert.that(output) + .satisfies(new VerifyAnySample(lines, limit)); - PCollection<Integer> input = pipeline.apply(Create.of(EMPTY) - .withCoder(BigEndianIntegerCoder.of())); - PCollection<Iterable<Integer>> output = input.apply( - Sample.<Integer>fixedSizeGlobally(3)); + p.run(); + } - PAssert.thatSingletonIterable(output) - .satisfies(new VerifyCorrectSample<>(0, EMPTY)); - pipeline.run(); + @Test + @Category(RunnableOnService.class) + public void testPickAny() { + runPickAnyTest(LINES, limit); + } } - @Test - @Category(RunnableOnService.class) - public void testSampleZero() { + /** + * Further tests for Sample transform. + */ + @RunWith(JUnit4.class) + public static class MiscTest { - PCollection<Integer> input = pipeline.apply(Create.of(DATA) - .withCoder(BigEndianIntegerCoder.of())); - PCollection<Iterable<Integer>> output = input.apply( - Sample.<Integer>fixedSizeGlobally(0)); + @Rule + public final transient TestPipeline pipeline = TestPipeline.create(); - PAssert.thatSingletonIterable(output) - .satisfies(new VerifyCorrectSample<>(0, DATA)); - pipeline.run(); - } - - @Test - @Category(RunnableOnService.class) - public void testSampleInsufficientElements() { + /** + * Verifies that the result of a Sample operation contains the expected number of elements, + * and that those elements are a subset of the items in expected. + */ + @SuppressWarnings("rawtypes") + public static class VerifyCorrectSample<T extends Comparable> + implements SerializableFunction<Iterable<T>, Void> { + private T[] expectedValues; + private int expectedSize; + + /** + * expectedSize is the number of elements that the Sample should contain. expected is the set + * of elements that the sample may contain. + */ + @SafeVarargs + VerifyCorrectSample(int expectedSize, T... expected) { + this.expectedValues = expected; + this.expectedSize = expectedSize; + } - PCollection<Integer> input = pipeline.apply(Create.of(DATA) - .withCoder(BigEndianIntegerCoder.of())); - PCollection<Iterable<Integer>> output = input.apply( - Sample.<Integer>fixedSizeGlobally(10)); + @Override + @SuppressWarnings("unchecked") + public Void apply(Iterable<T> in) { + List<T> actual = new ArrayList<>(); + for (T elem : in) { + actual.add(elem); + } - PAssert.thatSingletonIterable(output) - .satisfies(new VerifyCorrectSample<>(5, DATA)); - pipeline.run(); - } + assertEquals(expectedSize, actual.size()); + + Collections.sort(actual); // We assume that @expected is already sorted. + int i = 0; // Index into @expected + for (T s : actual) { + boolean matchFound = false; + for (; i < expectedValues.length; i++) { + if (s.equals(expectedValues[i])) { + matchFound = true; + break; + } + } + assertTrue("Invalid sample: " + Joiner.on(',').join(actual), matchFound); + i++; // Don't match the same element again. + } + return null; + } + } - @Test(expected = IllegalArgumentException.class) - public void testSampleNegative() { - pipeline.enableAbandonedNodeEnforcement(false); + @Test + @Category(RunnableOnService.class) + public void testSample() { - PCollection<Integer> input = pipeline.apply(Create.of(DATA) - .withCoder(BigEndianIntegerCoder.of())); - input.apply(Sample.<Integer>fixedSizeGlobally(-1)); - } + PCollection<Integer> input = pipeline.apply(Create.of(DATA) + .withCoder(BigEndianIntegerCoder.of())); + PCollection<Iterable<Integer>> output = input.apply( + Sample.<Integer>fixedSizeGlobally(3)); - @Test - @Category(RunnableOnService.class) - public void testSampleMultiplicity() { + PAssert.thatSingletonIterable(output) + .satisfies(new VerifyCorrectSample<>(3, DATA)); + pipeline.run(); + } - PCollection<Integer> input = pipeline.apply(Create.of(REPEATED_DATA) - .withCoder(BigEndianIntegerCoder.of())); - // At least one value must be selected with multiplicity. - PCollection<Iterable<Integer>> output = input.apply( - Sample.<Integer>fixedSizeGlobally(6)); + @Test + @Category(RunnableOnService.class) + public void testSampleEmpty() { - PAssert.thatSingletonIterable(output) - .satisfies(new VerifyCorrectSample<>(6, REPEATED_DATA)); - pipeline.run(); - } + PCollection<Integer> input = pipeline.apply(Create.of(EMPTY) + .withCoder(BigEndianIntegerCoder.of())); + PCollection<Iterable<Integer>> output = input.apply( + Sample.<Integer>fixedSizeGlobally(3)); - private static class VerifyAnySample implements SerializableFunction<Iterable<String>, Void> { - private final List<String> lines; - private final int limit; - private VerifyAnySample(List<String> lines, int limit) { - this.lines = lines; - this.limit = limit; + PAssert.thatSingletonIterable(output) + .satisfies(new VerifyCorrectSample<>(0, EMPTY)); + pipeline.run(); } - @Override - public Void apply(Iterable<String> actualIter) { - final int expectedSize = Math.min(limit, lines.size()); + @Test + @Category(RunnableOnService.class) + public void testSampleZero() { - // Make sure actual is the right length, and is a - // subset of expected. - List<String> actual = new ArrayList<>(); - for (String s : actualIter) { - actual.add(s); - } - assertEquals(expectedSize, actual.size()); - Set<String> actualAsSet = new TreeSet<>(actual); - Set<String> linesAsSet = new TreeSet<>(lines); - assertEquals(actual.size(), actualAsSet.size()); - assertEquals(lines.size(), linesAsSet.size()); - assertTrue(linesAsSet.containsAll(actualAsSet)); - return null; - } - } + PCollection<Integer> input = pipeline.apply(Create.of(DATA) + .withCoder(BigEndianIntegerCoder.of())); + PCollection<Iterable<Integer>> output = input.apply( + Sample.<Integer>fixedSizeGlobally(0)); - void runPickAnyTest(final List<String> lines, int limit) { - checkArgument(new HashSet<String>(lines).size() == lines.size(), "Duplicates are unsupported."); - Pipeline p = TestPipeline.create(); + PAssert.thatSingletonIterable(output) + .satisfies(new VerifyCorrectSample<>(0, DATA)); + pipeline.run(); + } - PCollection<String> input = p.apply(Create.of(lines) - .withCoder(StringUtf8Coder.of())); + @Test + @Category(RunnableOnService.class) + public void testSampleInsufficientElements() { - PCollection<String> output = - input.apply(Sample.<String>any(limit)); + PCollection<Integer> input = pipeline.apply(Create.of(DATA) + .withCoder(BigEndianIntegerCoder.of())); + PCollection<Iterable<Integer>> output = input.apply( + Sample.<Integer>fixedSizeGlobally(10)); + PAssert.thatSingletonIterable(output) + .satisfies(new VerifyCorrectSample<>(5, DATA)); + pipeline.run(); + } - PAssert.that(output) - .satisfies(new VerifyAnySample(lines, limit)); + @Test(expected = IllegalArgumentException.class) + public void testSampleNegative() { + pipeline.enableAbandonedNodeEnforcement(false); - p.run(); - } + PCollection<Integer> input = pipeline.apply(Create.of(DATA) + .withCoder(BigEndianIntegerCoder.of())); + input.apply(Sample.<Integer>fixedSizeGlobally(-1)); + } - @Test - @Category(RunnableOnService.class) - public void testPickAny() { - runPickAnyTest(LINES, 0); - runPickAnyTest(LINES, LINES.size() / 2); - runPickAnyTest(LINES, LINES.size() * 2); - } + @Test + @Category(RunnableOnService.class) + public void testSampleMultiplicity() { - @Test - // Extra tests, not worth the time to run on the real service. - @Category(NeedsRunner.class) - public void testPickAnyMore() { - runPickAnyTest(LINES, LINES.size() - 1); - runPickAnyTest(LINES, LINES.size()); - runPickAnyTest(LINES, LINES.size() + 1); - } + PCollection<Integer> input = pipeline.apply(Create.of(REPEATED_DATA) + .withCoder(BigEndianIntegerCoder.of())); + // At least one value must be selected with multiplicity. + PCollection<Iterable<Integer>> output = input.apply( + Sample.<Integer>fixedSizeGlobally(6)); - @Test - @Category(RunnableOnService.class) - public void testPickAnyWhenEmpty() { - runPickAnyTest(NO_LINES, 0); - runPickAnyTest(NO_LINES, 1); - } + PAssert.thatSingletonIterable(output) + .satisfies(new VerifyCorrectSample<>(6, REPEATED_DATA)); + pipeline.run(); + } - @Test - public void testSampleGetName() { - assertEquals("Sample.SampleAny", Sample.<String>any(1).getName()); - } + @Test + public void testSampleGetName() { + assertEquals("Sample.SampleAny", Sample.<String>any(1).getName()); + } - @Test - public void testDisplayData() { - PTransform<?, ?> sampleAny = Sample.any(1234); - DisplayData sampleAnyDisplayData = DisplayData.from(sampleAny); - assertThat(sampleAnyDisplayData, hasDisplayItem("sampleSize", 1234)); + @Test + public void testDisplayData() { + PTransform<?, ?> sampleAny = Sample.any(1234); + DisplayData sampleAnyDisplayData = DisplayData.from(sampleAny); + assertThat(sampleAnyDisplayData, hasDisplayItem("sampleSize", 1234)); - PTransform<?, ?> samplePerKey = Sample.fixedSizePerKey(2345); - DisplayData perKeyDisplayData = DisplayData.from(samplePerKey); - assertThat(perKeyDisplayData, hasDisplayItem("sampleSize", 2345)); + PTransform<?, ?> samplePerKey = Sample.fixedSizePerKey(2345); + DisplayData perKeyDisplayData = DisplayData.from(samplePerKey); + assertThat(perKeyDisplayData, hasDisplayItem("sampleSize", 2345)); + } } }