Respect WindowFn#getOutputTime in gearpump-runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/98854d4d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/98854d4d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/98854d4d Branch: refs/heads/master Commit: 98854d4d01ca526ea4a44dc077d2cfb4cddf9914 Parents: 3c7e3e6 Author: manuzhang <owenzhang1...@gmail.com> Authored: Fri May 19 09:19:42 2017 +0800 Committer: manuzhang <owenzhang1...@gmail.com> Committed: Mon Jun 5 19:16:53 2017 +0800 ---------------------------------------------------------------------- .../gearpump/translators/GroupByKeyTranslator.java | 12 ++++++++---- .../gearpump/translators/GroupByKeyTranslatorTest.java | 8 ++++---- 2 files changed, 12 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/98854d4d/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java index 521f665..7d944a4 100644 --- a/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java +++ b/runners/gearpump/src/main/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslator.java @@ -74,7 +74,7 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe new GearpumpWindowFn(windowFn.isNonMerging()), EventTimeTrigger$.MODULE$, Discarding$.MODULE$), "assign_window") .groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, "group_by_Key_and_Window") - .map(new KeyedByTimestamp<K, V>(timestampCombiner), "keyed_by_timestamp") + .map(new KeyedByTimestamp<K, V>(windowFn, timestampCombiner), "keyed_by_timestamp") .fold(new Merge<>(windowFn, timestampCombiner), "merge") .map(new Values<K, V>(), "values"); @@ -146,17 +146,21 @@ public class GroupByKeyTranslator<K, V> implements TransformTranslator<GroupByKe extends MapFunction<WindowedValue<KV<K, V>>, KV<Instant, WindowedValue<KV<K, V>>>> { + private final WindowFn<KV<K, V>, BoundedWindow> windowFn; private final TimestampCombiner timestampCombiner; - public KeyedByTimestamp(TimestampCombiner timestampCombiner) { + public KeyedByTimestamp(WindowFn<KV<K, V>, BoundedWindow> windowFn, + TimestampCombiner timestampCombiner) { + this.windowFn = windowFn; this.timestampCombiner = timestampCombiner; } @Override public KV<org.joda.time.Instant, WindowedValue<KV<K, V>>> map( WindowedValue<KV<K, V>> wv) { - Instant timestamp = timestampCombiner.assign( - Iterables.getOnlyElement(wv.getWindows()), wv.getTimestamp()); + BoundedWindow window = Iterables.getOnlyElement(wv.getWindows()); + Instant timestamp = timestampCombiner.assign(window + , windowFn.getOutputTime(wv.getTimestamp(), window)); return KV.of(timestamp, wv); } } http://git-wip-us.apache.org/repos/asf/beam/blob/98854d4d/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java index 86b60aa..d5b931b 100644 --- a/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java +++ b/runners/gearpump/src/test/java/org/apache/beam/runners/gearpump/translators/GroupByKeyTranslatorTest.java @@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import java.time.Instant; @@ -95,18 +94,19 @@ public class GroupByKeyTranslatorTest { @Test @SuppressWarnings({"rawtypes", "unchecked"}) public void testKeyedByTimestamp() { + WindowFn slidingWindows = Sessions.withGapDuration(Duration.millis(10)); BoundedWindow window = new IntervalWindow(new org.joda.time.Instant(0), new org.joda.time.Instant(10)); GroupByKeyTranslator.KeyedByTimestamp keyedByTimestamp = - new GroupByKeyTranslator.KeyedByTimestamp(timestampCombiner); + new GroupByKeyTranslator.KeyedByTimestamp(slidingWindows, timestampCombiner); WindowedValue<KV<String, String>> value = WindowedValue.of( KV.of("key", "val"), org.joda.time.Instant.now(), window, PaneInfo.NO_FIRING); KV<org.joda.time.Instant, WindowedValue<KV<String, String>>> result = keyedByTimestamp.map(value); org.joda.time.Instant time = - timestampCombiner.assign(Iterables.getOnlyElement(value.getWindows()), - value.getTimestamp()); + timestampCombiner.assign(window, + slidingWindows.getOutputTime(value.getTimestamp(), window)); assertThat(result, equalTo(KV.of(time, value))); }