Repository: incubator-beam
Updated Branches:
  refs/heads/master 0c47cad48 -> 96e286fec


[flink] improve lifecycle handling of GroupAlsoByWindowWrapper


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/033b9240
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/033b9240
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/033b9240

Branch: refs/heads/master
Commit: 033b9240765543438068c1adea6d0cff34ddcd53
Parents: 17863c8
Author: Maximilian Michels <m...@apache.org>
Authored: Mon Mar 28 11:31:38 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Wed Mar 30 11:31:56 2016 +0200

----------------------------------------------------------------------
 .../wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java      | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/033b9240/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
index b413d7a..751d44c 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java
@@ -220,6 +220,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, 
VOUT>
   public void open() throws Exception {
     super.open();
     this.context = new ProcessContext(operator, new 
TimestampedCollector<>(output), this.timerInternals);
+    operator.startBundle(context);
   }
 
   /**
@@ -252,11 +253,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, 
VOUT>
 
   private void processKeyedWorkItem(KeyedWorkItem<K, VIN> workItem) throws 
Exception {
     context.setElement(workItem, getStateInternalsForKey(workItem.key()));
-
-    // TODO: Ideally startBundle/finishBundle would be called when the 
operator is first used / about to be discarded.
-    operator.startBundle(context);
     operator.processElement(context);
-    operator.finishBundle(context);
   }
 
   @Override
@@ -309,6 +306,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, 
VOUT>
 
   @Override
   public void close() throws Exception {
+    operator.finishBundle(context);
     super.close();
   }
 

Reply via email to