Move LatestFnTests to LatestFnTest
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/52e43ac7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/52e43ac7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/52e43ac7 Branch: refs/heads/master Commit: 52e43ac7b8257ecbcda61eb3b14406c36df08a3b Parents: 60a8aef Author: Kenneth Knowles <k...@google.com> Authored: Tue Oct 4 13:23:37 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Thu Oct 6 09:49:53 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/transforms/LatestFnTest.java | 233 +++++++++++++++++++ .../beam/sdk/transforms/LatestFnTests.java | 233 ------------------- 2 files changed, 233 insertions(+), 233 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/52e43ac7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java new file mode 100644 index 0000000..31acb08 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.isOneOf; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Objects; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link Latest.LatestFn}. + * */ +@RunWith(JUnit4.class) +public class LatestFnTest { + private static final Instant INSTANT = new Instant(100); + private static final long VALUE = 100 * INSTANT.getMillis(); + + private static final TimestampedValue<Long> TV = TimestampedValue.of(VALUE, INSTANT); + private static final TimestampedValue<Long> TV_MINUS_TEN = + TimestampedValue.of(VALUE - 10, INSTANT.minus(10)); + private static final TimestampedValue<Long> TV_PLUS_TEN = + TimestampedValue.of(VALUE + 10, INSTANT.plus(10)); + + @Rule + public final ExpectedException thrown = ExpectedException.none(); + + private final Latest.LatestFn<Long> fn = new Latest.LatestFn<>(); + private final Instant baseTimestamp = Instant.now(); + + @Test + public void testDefaultValue() { + assertThat(fn.defaultValue(), nullValue()); + } + + @Test + public void testCreateAccumulator() { + assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.createAccumulator()); + } + + @Test + public void testAddInputInitialAdd() { + TimestampedValue<Long> input = TV; + assertEquals(input, fn.addInput(fn.createAccumulator(), input)); + } + + @Test + public void testAddInputMinTimestamp() { + TimestampedValue<Long> input = TimestampedValue.atMinimumTimestamp(1234L); + assertEquals(input, fn.addInput(fn.createAccumulator(), input)); + } + + @Test + public void testAddInputEarlierValue() { + assertEquals(TV, fn.addInput(TV, TV_MINUS_TEN)); + } + + @Test + public void testAddInputLaterValue() { + assertEquals(TV_PLUS_TEN, fn.addInput(TV, TV_PLUS_TEN)); + } + + @Test + public void testAddInputSameTimestamp() { + TimestampedValue<Long> accum = TimestampedValue.of(100L, INSTANT); + TimestampedValue<Long> input = TimestampedValue.of(200L, INSTANT); + + assertThat("Latest for values with the same timestamp is chosen arbitrarily", + fn.addInput(accum, input), isOneOf(accum, input)); + } + + @Test + public void testAddInputNullAccumulator() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("accumulator"); + fn.addInput(null, TV); + } + + @Test + public void testAddInputNullInput() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("input"); + fn.addInput(TV, null); + } + + @Test + public void testAddInputNullValue() { + TimestampedValue<Long> input = TimestampedValue.of(null, INSTANT.plus(10)); + assertEquals("Null values are allowed", input, fn.addInput(TV, input)); + } + + @Test + public void testMergeAccumulatorsMultipleValues() { + Iterable<TimestampedValue<Long>> accums = Lists.newArrayList( + TV, + TV_PLUS_TEN, + TV_MINUS_TEN + ); + + assertEquals(TV_PLUS_TEN, fn.mergeAccumulators(accums)); + } + + @Test + public void testMergeAccumulatorsSingleValue() { + assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV))); + } + + @Test + public void testMergeAccumulatorsEmptyIterable() { + ArrayList<TimestampedValue<Long>> emptyAccums = Lists.newArrayList(); + assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.mergeAccumulators(emptyAccums)); + } + + @Test + public void testMergeAccumulatorsDefaultAccumulator() { + TimestampedValue<Long> defaultAccum = fn.createAccumulator(); + assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV, defaultAccum))); + } + + @Test + public void testMergeAccumulatorsAllDefaultAccumulators() { + TimestampedValue<Long> defaultAccum = fn.createAccumulator(); + assertEquals(defaultAccum, fn.mergeAccumulators( + Lists.newArrayList(defaultAccum, defaultAccum))); + } + + @Test + public void testMergeAccumulatorsNullIterable() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("accumulators"); + fn.mergeAccumulators(null); + } + + @Test + public void testExtractOutput() { + assertEquals(TV.getValue(), fn.extractOutput(TV)); + } + + @Test + public void testExtractOutputDefaultAggregator() { + TimestampedValue<Long> accum = fn.createAccumulator(); + assertThat(fn.extractOutput(accum), nullValue()); + } + + @Test + public void testExtractOutputNullValue() { + TimestampedValue<Long> accum = TimestampedValue.of(null, baseTimestamp); + assertEquals(null, fn.extractOutput(accum)); + } + + @Test + public void testAggregator() throws Exception { + LatestAggregatorsFn<Long> doFn = new LatestAggregatorsFn<>(TV_MINUS_TEN.getValue()); + DoFnTester<Long, Long> harness = DoFnTester.of(doFn); + for (TimestampedValue<Long> element : Arrays.asList(TV, TV_PLUS_TEN, TV_MINUS_TEN)) { + harness.processTimestampedElement(element); + } + + assertEquals(TV_PLUS_TEN.getValue(), harness.getAggregatorValue(doFn.allValuesAgg)); + assertEquals(TV_MINUS_TEN.getValue(), harness.getAggregatorValue(doFn.specialValueAgg)); + assertThat(harness.getAggregatorValue(doFn.noValuesAgg), nullValue()); + } + + @Test + public void testDefaultCoderHandlesNull() throws CannotProvideCoderException { + Latest.LatestFn<Long> fn = new Latest.LatestFn<>(); + + CoderRegistry registry = new CoderRegistry(); + TimestampedValue.TimestampedValueCoder<Long> inputCoder = + TimestampedValue.TimestampedValueCoder.of(VarLongCoder.of()); + + assertThat("Default output coder should handle null values", + fn.getDefaultOutputCoder(registry, inputCoder), instanceOf(NullableCoder.class)); + assertThat("Default accumulator coder should handle null values", + fn.getAccumulatorCoder(registry, inputCoder), instanceOf(NullableCoder.class)); + } + + static class LatestAggregatorsFn<T> extends DoFn<T, T> { + private final T specialValue; + LatestAggregatorsFn(T specialValue) { + this.specialValue = specialValue; + } + + Aggregator<TimestampedValue<T>, T> allValuesAgg = + createAggregator("allValues", new Latest.LatestFn<T>()); + + Aggregator<TimestampedValue<T>, T> specialValueAgg = + createAggregator("oneValue", new Latest.LatestFn<T>()); + + Aggregator<TimestampedValue<T>, T> noValuesAgg = + createAggregator("noValues", new Latest.LatestFn<T>()); + + @ProcessElement + public void processElement(ProcessContext c) { + TimestampedValue<T> val = TimestampedValue.of(c.element(), c.timestamp()); + allValuesAgg.addValue(val); + if (Objects.equals(c.element(), specialValue)) { + specialValueAgg.addValue(val); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/52e43ac7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java deleted file mode 100644 index 459a966..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.transforms; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.isOneOf; -import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertEquals; - -import com.google.common.collect.Lists; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Objects; -import org.apache.beam.sdk.coders.CannotProvideCoderException; -import org.apache.beam.sdk.coders.CoderRegistry; -import org.apache.beam.sdk.coders.NullableCoder; -import org.apache.beam.sdk.coders.VarLongCoder; -import org.apache.beam.sdk.values.TimestampedValue; -import org.joda.time.Instant; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Unit tests for {@link Latest.LatestFn}. - * */ -@RunWith(JUnit4.class) -public class LatestFnTests { - private static final Instant INSTANT = new Instant(100); - private static final long VALUE = 100 * INSTANT.getMillis(); - - private static final TimestampedValue<Long> TV = TimestampedValue.of(VALUE, INSTANT); - private static final TimestampedValue<Long> TV_MINUS_TEN = - TimestampedValue.of(VALUE - 10, INSTANT.minus(10)); - private static final TimestampedValue<Long> TV_PLUS_TEN = - TimestampedValue.of(VALUE + 10, INSTANT.plus(10)); - - @Rule - public final ExpectedException thrown = ExpectedException.none(); - - private final Latest.LatestFn<Long> fn = new Latest.LatestFn<>(); - private final Instant baseTimestamp = Instant.now(); - - @Test - public void testDefaultValue() { - assertThat(fn.defaultValue(), nullValue()); - } - - @Test - public void testCreateAccumulator() { - assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.createAccumulator()); - } - - @Test - public void testAddInputInitialAdd() { - TimestampedValue<Long> input = TV; - assertEquals(input, fn.addInput(fn.createAccumulator(), input)); - } - - @Test - public void testAddInputMinTimestamp() { - TimestampedValue<Long> input = TimestampedValue.atMinimumTimestamp(1234L); - assertEquals(input, fn.addInput(fn.createAccumulator(), input)); - } - - @Test - public void testAddInputEarlierValue() { - assertEquals(TV, fn.addInput(TV, TV_MINUS_TEN)); - } - - @Test - public void testAddInputLaterValue() { - assertEquals(TV_PLUS_TEN, fn.addInput(TV, TV_PLUS_TEN)); - } - - @Test - public void testAddInputSameTimestamp() { - TimestampedValue<Long> accum = TimestampedValue.of(100L, INSTANT); - TimestampedValue<Long> input = TimestampedValue.of(200L, INSTANT); - - assertThat("Latest for values with the same timestamp is chosen arbitrarily", - fn.addInput(accum, input), isOneOf(accum, input)); - } - - @Test - public void testAddInputNullAccumulator() { - thrown.expect(NullPointerException.class); - thrown.expectMessage("accumulator"); - fn.addInput(null, TV); - } - - @Test - public void testAddInputNullInput() { - thrown.expect(NullPointerException.class); - thrown.expectMessage("input"); - fn.addInput(TV, null); - } - - @Test - public void testAddInputNullValue() { - TimestampedValue<Long> input = TimestampedValue.of(null, INSTANT.plus(10)); - assertEquals("Null values are allowed", input, fn.addInput(TV, input)); - } - - @Test - public void testMergeAccumulatorsMultipleValues() { - Iterable<TimestampedValue<Long>> accums = Lists.newArrayList( - TV, - TV_PLUS_TEN, - TV_MINUS_TEN - ); - - assertEquals(TV_PLUS_TEN, fn.mergeAccumulators(accums)); - } - - @Test - public void testMergeAccumulatorsSingleValue() { - assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV))); - } - - @Test - public void testMergeAccumulatorsEmptyIterable() { - ArrayList<TimestampedValue<Long>> emptyAccums = Lists.newArrayList(); - assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.mergeAccumulators(emptyAccums)); - } - - @Test - public void testMergeAccumulatorsDefaultAccumulator() { - TimestampedValue<Long> defaultAccum = fn.createAccumulator(); - assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV, defaultAccum))); - } - - @Test - public void testMergeAccumulatorsAllDefaultAccumulators() { - TimestampedValue<Long> defaultAccum = fn.createAccumulator(); - assertEquals(defaultAccum, fn.mergeAccumulators( - Lists.newArrayList(defaultAccum, defaultAccum))); - } - - @Test - public void testMergeAccumulatorsNullIterable() { - thrown.expect(NullPointerException.class); - thrown.expectMessage("accumulators"); - fn.mergeAccumulators(null); - } - - @Test - public void testExtractOutput() { - assertEquals(TV.getValue(), fn.extractOutput(TV)); - } - - @Test - public void testExtractOutputDefaultAggregator() { - TimestampedValue<Long> accum = fn.createAccumulator(); - assertThat(fn.extractOutput(accum), nullValue()); - } - - @Test - public void testExtractOutputNullValue() { - TimestampedValue<Long> accum = TimestampedValue.of(null, baseTimestamp); - assertEquals(null, fn.extractOutput(accum)); - } - - @Test - public void testAggregator() throws Exception { - LatestAggregatorsFn<Long> doFn = new LatestAggregatorsFn<>(TV_MINUS_TEN.getValue()); - DoFnTester<Long, Long> harness = DoFnTester.of(doFn); - for (TimestampedValue<Long> element : Arrays.asList(TV, TV_PLUS_TEN, TV_MINUS_TEN)) { - harness.processTimestampedElement(element); - } - - assertEquals(TV_PLUS_TEN.getValue(), harness.getAggregatorValue(doFn.allValuesAgg)); - assertEquals(TV_MINUS_TEN.getValue(), harness.getAggregatorValue(doFn.specialValueAgg)); - assertThat(harness.getAggregatorValue(doFn.noValuesAgg), nullValue()); - } - - @Test - public void testDefaultCoderHandlesNull() throws CannotProvideCoderException { - Latest.LatestFn<Long> fn = new Latest.LatestFn<>(); - - CoderRegistry registry = new CoderRegistry(); - TimestampedValue.TimestampedValueCoder<Long> inputCoder = - TimestampedValue.TimestampedValueCoder.of(VarLongCoder.of()); - - assertThat("Default output coder should handle null values", - fn.getDefaultOutputCoder(registry, inputCoder), instanceOf(NullableCoder.class)); - assertThat("Default accumulator coder should handle null values", - fn.getAccumulatorCoder(registry, inputCoder), instanceOf(NullableCoder.class)); - } - - static class LatestAggregatorsFn<T> extends DoFn<T, T> { - private final T specialValue; - LatestAggregatorsFn(T specialValue) { - this.specialValue = specialValue; - } - - Aggregator<TimestampedValue<T>, T> allValuesAgg = - createAggregator("allValues", new Latest.LatestFn<T>()); - - Aggregator<TimestampedValue<T>, T> specialValueAgg = - createAggregator("oneValue", new Latest.LatestFn<T>()); - - Aggregator<TimestampedValue<T>, T> noValuesAgg = - createAggregator("noValues", new Latest.LatestFn<T>()); - - @ProcessElement - public void processElement(ProcessContext c) { - TimestampedValue<T> val = TimestampedValue.of(c.element(), c.timestamp()); - allValuesAgg.addValue(val); - if (Objects.equals(c.element(), specialValue)) { - specialValueAgg.addValue(val); - } - } - } -}