Repository: beam Updated Branches: refs/heads/master 4ccbdbc38 -> 72fef99a6
Drop late data in Flink runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0454a189 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0454a189 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0454a189 Branch: refs/heads/master Commit: 0454a1897c645f674754bc9ef69dc7bab2b3c3ba Parents: 7da5a2c Author: Kenneth Knowles <k...@google.com> Authored: Wed Feb 1 18:25:42 2017 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Wed Feb 1 18:25:42 2017 -0800 ---------------------------------------------------------------------- .../wrappers/streaming/DoFnOperator.java | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0454a189/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index de0264a..c1d33f7 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -33,6 +33,7 @@ import org.apache.beam.runners.core.AggregatorFactory; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.ExecutionContext; +import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; @@ -234,6 +235,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> doFnInvoker.invokeSetup(); + ExecutionContext.StepContext stepContext = createStepContext(); + DoFnRunner<InputT, FnOutputT> doFnRunner = DoFnRunners.simpleRunner( serializedOptions.getPipelineOptions(), doFn, @@ -241,13 +244,26 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> outputManager, mainOutputTag, sideOutputTags, - createStepContext(), + stepContext, aggregatorFactory, windowingStrategy); + if (doFn instanceof GroupAlsoByWindowViaWindowSetNewDoFn) { + // When the doFn is this, we know it came from WindowDoFnOperator and + // InputT = KeyedWorkItem<K, V> + // OutputT = KV<K, V> + // + // for some K, V + + doFnRunner = DoFnRunners.lateDataDroppingRunner( + (DoFnRunner) doFnRunner, + stepContext, + windowingStrategy, + ((GroupAlsoByWindowViaWindowSetNewDoFn) doFn).getDroppedDueToLatenessAggregator()); + } + pushbackDoFnRunner = PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); - } @Override