Repository: incubator-beam Updated Branches: refs/heads/master 774944014 -> 2b269559f
Use TimestampedValue in DoFnTester This removes the duplicate OutputElementWithTimestamp data structure. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4da5ebfb Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4da5ebfb Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4da5ebfb Branch: refs/heads/master Commit: 4da5ebfbf021051288620634ec84cafa9208265c Parents: 7749440 Author: Thomas Groh <tg...@google.com> Authored: Tue Jun 14 13:39:59 2016 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Tue Jun 14 13:39:59 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/transforms/DoFnTester.java | 56 ++++---------------- .../beam/sdk/transforms/DoFnTesterTest.java | 22 ++++---- 2 files changed, 20 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4da5ebfb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 332ea13..1df42e2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -28,12 +28,12 @@ import org.apache.beam.sdk.util.SerializableUtils; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.TimestampedValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import com.google.common.base.Function; import com.google.common.base.MoreObjects; -import com.google.common.base.Objects; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; @@ -256,10 +256,10 @@ public class DoFnTester<InputT, OutputT> { // TODO: Should we return an unmodifiable list? return Lists.transform( peekOutputElementsWithTimestamp(), - new Function<OutputElementWithTimestamp<OutputT>, OutputT>() { + new Function<TimestampedValue<OutputT>, OutputT>() { @Override @SuppressWarnings("unchecked") - public OutputT apply(OutputElementWithTimestamp<OutputT> input) { + public OutputT apply(TimestampedValue<OutputT> input) { return input.getValue(); } }); @@ -274,16 +274,14 @@ public class DoFnTester<InputT, OutputT> { * @see #clearOutputElements */ @Experimental - public List<OutputElementWithTimestamp<OutputT>> peekOutputElementsWithTimestamp() { + public List<TimestampedValue<OutputT>> peekOutputElementsWithTimestamp() { // TODO: Should we return an unmodifiable list? return Lists.transform(getOutput(mainOutputTag), - new Function<Object, OutputElementWithTimestamp<OutputT>>() { + new Function<WindowedValue<OutputT>, TimestampedValue<OutputT>>() { @Override @SuppressWarnings("unchecked") - public OutputElementWithTimestamp<OutputT> apply(Object input) { - return new OutputElementWithTimestamp<OutputT>( - ((WindowedValue<OutputT>) input).getValue(), - ((WindowedValue<OutputT>) input).getTimestamp()); + public TimestampedValue<OutputT> apply(WindowedValue<OutputT> input) { + return TimestampedValue.of(input.getValue(), input.getTimestamp()); } }); } @@ -318,8 +316,8 @@ public class DoFnTester<InputT, OutputT> { * @see #clearOutputElements */ @Experimental - public List<OutputElementWithTimestamp<OutputT>> takeOutputElementsWithTimestamp() { - List<OutputElementWithTimestamp<OutputT>> resultElems = + public List<TimestampedValue<OutputT>> takeOutputElementsWithTimestamp() { + List<TimestampedValue<OutputT>> resultElems = new ArrayList<>(peekOutputElementsWithTimestamp()); clearOutputElements(); return resultElems; @@ -383,42 +381,6 @@ public class DoFnTester<InputT, OutputT> { return combiner.extractOutput(accumulator); } - /** - * Holder for an OutputElement along with its associated timestamp. - */ - @Experimental - public static class OutputElementWithTimestamp<OutputT> { - private final OutputT value; - private final Instant timestamp; - - OutputElementWithTimestamp(OutputT value, Instant timestamp) { - this.value = value; - this.timestamp = timestamp; - } - - OutputT getValue() { - return value; - } - - Instant getTimestamp() { - return timestamp; - } - - @Override - public boolean equals(Object obj) { - if (!(obj instanceof OutputElementWithTimestamp)) { - return false; - } - OutputElementWithTimestamp<?> other = (OutputElementWithTimestamp<?>) obj; - return Objects.equal(other.value, value) && Objects.equal(other.timestamp, timestamp); - } - - @Override - public int hashCode() { - return Objects.hashCode(value, timestamp); - } - } - private <T> List<WindowedValue<T>> getOutput(TupleTag<T> tag) { @SuppressWarnings({"unchecked", "rawtypes"}) List<WindowedValue<T>> elems = (List) outputs.get(tag); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4da5ebfb/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java index ec22251..490ed7f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java @@ -23,7 +23,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -import org.apache.beam.sdk.transforms.DoFnTester.OutputElementWithTimestamp; +import org.apache.beam.sdk.values.TimestampedValue; import org.joda.time.Instant; import org.junit.Test; @@ -145,23 +145,23 @@ public class DoFnTesterTest { tester.processElement(1L); tester.processElement(2L); - List<OutputElementWithTimestamp<String>> peek = tester.peekOutputElementsWithTimestamp(); - OutputElementWithTimestamp<String> one = - new OutputElementWithTimestamp<>("1", new Instant(1000L)); - OutputElementWithTimestamp<String> two = - new OutputElementWithTimestamp<>("2", new Instant(2000L)); + List<TimestampedValue<String>> peek = tester.peekOutputElementsWithTimestamp(); + TimestampedValue<String> one = + TimestampedValue.of("1", new Instant(1000L)); + TimestampedValue<String> two = + TimestampedValue.of("2", new Instant(2000L)); assertThat(peek, hasItems(one, two)); tester.processElement(3L); tester.processElement(4L); - OutputElementWithTimestamp<String> three = - new OutputElementWithTimestamp<>("3", new Instant(3000L)); - OutputElementWithTimestamp<String> four = - new OutputElementWithTimestamp<>("4", new Instant(4000L)); + TimestampedValue<String> three = + TimestampedValue.of("3", new Instant(3000L)); + TimestampedValue<String> four = + TimestampedValue.of("4", new Instant(4000L)); peek = tester.peekOutputElementsWithTimestamp(); assertThat(peek, hasItems(one, two, three, four)); - List<OutputElementWithTimestamp<String>> take = tester.takeOutputElementsWithTimestamp(); + List<TimestampedValue<String>> take = tester.takeOutputElementsWithTimestamp(); assertThat(take, hasItems(one, two, three, four)); // Following takeOutputElementsWithTimestamp(), neither takeOutputElementsWithTimestamp()