Repository: incubator-beam
Updated Branches:
  refs/heads/master 774944014 -> 2b269559f


Use TimestampedValue in DoFnTester

This removes the duplicate OutputElementWithTimestamp data structure.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4da5ebfb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4da5ebfb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4da5ebfb

Branch: refs/heads/master
Commit: 4da5ebfbf021051288620634ec84cafa9208265c
Parents: 7749440
Author: Thomas Groh <tg...@google.com>
Authored: Tue Jun 14 13:39:59 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Jun 14 13:39:59 2016 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/transforms/DoFnTester.java  | 56 ++++----------------
 .../beam/sdk/transforms/DoFnTesterTest.java     | 22 ++++----
 2 files changed, 20 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4da5ebfb/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 332ea13..1df42e2 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -28,12 +28,12 @@ import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TimestampedValue;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 
 import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
-import com.google.common.base.Objects;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 
@@ -256,10 +256,10 @@ public class DoFnTester<InputT, OutputT> {
     // TODO: Should we return an unmodifiable list?
     return Lists.transform(
         peekOutputElementsWithTimestamp(),
-        new Function<OutputElementWithTimestamp<OutputT>, OutputT>() {
+        new Function<TimestampedValue<OutputT>, OutputT>() {
           @Override
           @SuppressWarnings("unchecked")
-          public OutputT apply(OutputElementWithTimestamp<OutputT> input) {
+          public OutputT apply(TimestampedValue<OutputT> input) {
             return input.getValue();
           }
         });
@@ -274,16 +274,14 @@ public class DoFnTester<InputT, OutputT> {
    * @see #clearOutputElements
    */
   @Experimental
-  public List<OutputElementWithTimestamp<OutputT>> 
peekOutputElementsWithTimestamp() {
+  public List<TimestampedValue<OutputT>> peekOutputElementsWithTimestamp() {
     // TODO: Should we return an unmodifiable list?
     return Lists.transform(getOutput(mainOutputTag),
-        new Function<Object, OutputElementWithTimestamp<OutputT>>() {
+        new Function<WindowedValue<OutputT>, TimestampedValue<OutputT>>() {
           @Override
           @SuppressWarnings("unchecked")
-          public OutputElementWithTimestamp<OutputT> apply(Object input) {
-            return new OutputElementWithTimestamp<OutputT>(
-                ((WindowedValue<OutputT>) input).getValue(),
-                ((WindowedValue<OutputT>) input).getTimestamp());
+          public TimestampedValue<OutputT> apply(WindowedValue<OutputT> input) 
{
+            return TimestampedValue.of(input.getValue(), input.getTimestamp());
           }
         });
   }
@@ -318,8 +316,8 @@ public class DoFnTester<InputT, OutputT> {
    * @see #clearOutputElements
    */
   @Experimental
-  public List<OutputElementWithTimestamp<OutputT>> 
takeOutputElementsWithTimestamp() {
-    List<OutputElementWithTimestamp<OutputT>> resultElems =
+  public List<TimestampedValue<OutputT>> takeOutputElementsWithTimestamp() {
+    List<TimestampedValue<OutputT>> resultElems =
         new ArrayList<>(peekOutputElementsWithTimestamp());
     clearOutputElements();
     return resultElems;
@@ -383,42 +381,6 @@ public class DoFnTester<InputT, OutputT> {
     return combiner.extractOutput(accumulator);
   }
 
-  /**
-   * Holder for an OutputElement along with its associated timestamp.
-   */
-  @Experimental
-  public static class OutputElementWithTimestamp<OutputT> {
-    private final OutputT value;
-    private final Instant timestamp;
-
-    OutputElementWithTimestamp(OutputT value, Instant timestamp) {
-      this.value = value;
-      this.timestamp = timestamp;
-    }
-
-    OutputT getValue() {
-      return value;
-    }
-
-    Instant getTimestamp() {
-      return timestamp;
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (!(obj instanceof OutputElementWithTimestamp)) {
-        return false;
-      }
-      OutputElementWithTimestamp<?> other = (OutputElementWithTimestamp<?>) 
obj;
-      return Objects.equal(other.value, value) && 
Objects.equal(other.timestamp, timestamp);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(value, timestamp);
-    }
-  }
-
   private <T> List<WindowedValue<T>> getOutput(TupleTag<T> tag) {
     @SuppressWarnings({"unchecked", "rawtypes"})
     List<WindowedValue<T>> elems = (List) outputs.get(tag);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4da5ebfb/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
index ec22251..490ed7f 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java
@@ -23,7 +23,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.beam.sdk.transforms.DoFnTester.OutputElementWithTimestamp;
+import org.apache.beam.sdk.values.TimestampedValue;
 
 import org.joda.time.Instant;
 import org.junit.Test;
@@ -145,23 +145,23 @@ public class DoFnTesterTest {
     tester.processElement(1L);
     tester.processElement(2L);
 
-    List<OutputElementWithTimestamp<String>> peek = 
tester.peekOutputElementsWithTimestamp();
-    OutputElementWithTimestamp<String> one =
-        new OutputElementWithTimestamp<>("1", new Instant(1000L));
-    OutputElementWithTimestamp<String> two =
-        new OutputElementWithTimestamp<>("2", new Instant(2000L));
+    List<TimestampedValue<String>> peek = 
tester.peekOutputElementsWithTimestamp();
+    TimestampedValue<String> one =
+        TimestampedValue.of("1", new Instant(1000L));
+    TimestampedValue<String> two =
+        TimestampedValue.of("2", new Instant(2000L));
     assertThat(peek, hasItems(one, two));
 
     tester.processElement(3L);
     tester.processElement(4L);
 
-    OutputElementWithTimestamp<String> three =
-        new OutputElementWithTimestamp<>("3", new Instant(3000L));
-    OutputElementWithTimestamp<String> four =
-        new OutputElementWithTimestamp<>("4", new Instant(4000L));
+    TimestampedValue<String> three =
+        TimestampedValue.of("3", new Instant(3000L));
+    TimestampedValue<String> four =
+        TimestampedValue.of("4", new Instant(4000L));
     peek = tester.peekOutputElementsWithTimestamp();
     assertThat(peek, hasItems(one, two, three, four));
-    List<OutputElementWithTimestamp<String>> take = 
tester.takeOutputElementsWithTimestamp();
+    List<TimestampedValue<String>> take = 
tester.takeOutputElementsWithTimestamp();
     assertThat(take, hasItems(one, two, three, four));
 
     // Following takeOutputElementsWithTimestamp(), neither 
takeOutputElementsWithTimestamp()

Reply via email to