Repository: incubator-beam Updated Branches: refs/heads/master 0c47cad48 -> 96e286fec
[flink] improve lifecycle handling of GroupAlsoByWindowWrapper Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/033b9240 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/033b9240 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/033b9240 Branch: refs/heads/master Commit: 033b9240765543438068c1adea6d0cff34ddcd53 Parents: 17863c8 Author: Maximilian Michels <m...@apache.org> Authored: Mon Mar 28 11:31:38 2016 +0200 Committer: Maximilian Michels <m...@apache.org> Committed: Wed Mar 30 11:31:56 2016 +0200 ---------------------------------------------------------------------- .../wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/033b9240/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java index b413d7a..751d44c 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java @@ -220,6 +220,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> public void open() throws Exception { super.open(); this.context = new ProcessContext(operator, new TimestampedCollector<>(output), this.timerInternals); + operator.startBundle(context); } /** @@ -252,11 +253,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws Exception { context.setElement(workItem, getStateInternalsForKey(workItem.key())); - - // TODO: Ideally startBundle/finishBundle would be called when the operator is first used / about to be discarded. - operator.startBundle(context); operator.processElement(context); - operator.finishBundle(context); } @Override @@ -309,6 +306,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> @Override public void close() throws Exception { + operator.finishBundle(context); super.close(); }