Repository: beam Updated Branches: refs/heads/master d66029caf -> 79b1395c2
[BEAM-1517] Garbage collect user state in Flink Runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6e9f7ad4 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6e9f7ad4 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6e9f7ad4 Branch: refs/heads/master Commit: 6e9f7ad46bcb341d37ac5d7804465dd82ef3b56e Parents: d66029c Author: JingsongLi <lzljs3620...@aliyun.com> Authored: Tue Feb 28 16:08:38 2017 +0800 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Wed Mar 1 17:31:36 2017 +0100 ---------------------------------------------------------------------- .../apache/beam/runners/core/DoFnRunners.java | 39 +++ .../beam/runners/core/StatefulDoFnRunner.java | 233 +++++++++++++++++ .../runners/core/StatefulDoFnRunnerTest.java | 255 +++++++++++++++++++ .../wrappers/streaming/DoFnOperator.java | 4 + 4 files changed, 531 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6e9f7ad4/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index f3972ae..9455eea 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -19,9 +19,15 @@ package org.apache.beam.runners.core; import java.util.List; import org.apache.beam.runners.core.ExecutionContext.StepContext; +import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer; +import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner; +import org.apache.beam.runners.core.StatefulDoFnRunner.StateInternalsStateCleaner; +import org.apache.beam.runners.core.StatefulDoFnRunner.TimeInternalsCleanupTimer; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; @@ -116,4 +122,37 @@ public class DoFnRunners { stepContext.timerInternals(), droppedDueToLatenessAggregator); } + + /** + * Returns an implementation of {@link DoFnRunner} that handles + * late data dropping and garbage collection for stateful {@link DoFn DoFns}. + * + * <p>It registers a timer by TimeInternals, and clean all states by StateInternals. + */ + public static <InputT, OutputT, W extends BoundedWindow> + DoFnRunner<InputT, OutputT> defaultStatefulDoFnRunner( + DoFn<InputT, OutputT> fn, + DoFnRunner<InputT, OutputT> doFnRunner, + StepContext stepContext, + AggregatorFactory aggregatorFactory, + WindowingStrategy<?, ?> windowingStrategy) { + Aggregator<Long, Long> droppedDueToLateness = aggregatorFactory.createAggregatorForDoFn( + fn.getClass(), stepContext, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER, + Sum.ofLongs()); + + CleanupTimer cleanupTimer = + new TimeInternalsCleanupTimer(stepContext.timerInternals(), windowingStrategy); + + Coder<W> windowCoder = (Coder<W>) windowingStrategy.getWindowFn().windowCoder(); + StateCleaner<W> stateCleaner = + new StateInternalsStateCleaner<>(fn, stepContext.stateInternals(), windowCoder); + + return new StatefulDoFnRunner<>( + doFnRunner, + windowingStrategy, + cleanupTimer, + stateCleaner, + droppedDueToLateness); + } + } http://git-wip-us.apache.org/repos/asf/beam/blob/6e9f7ad4/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java new file mode 100644 index 0000000..154d8bc --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.WindowTracing; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.State; +import org.apache.beam.sdk.util.state.StateSpec; +import org.joda.time.Instant; + +/** + * A customized {@link DoFnRunner} that handles late data dropping and garbage collection for + * stateful {@link DoFn DoFns}. It registers a GC timer in {@link #processElement(WindowedValue)} + * and does cleanup in {@link #onTimer(String, BoundedWindow, Instant, TimeDomain)} + * + * @param <InputT> the type of the {@link DoFn} (main) input elements + * @param <OutputT> the type of the {@link DoFn} (main) output elements + */ +public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow> + implements DoFnRunner<InputT, OutputT> { + + public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId"; + public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "StatefulParDoDropped"; + + private final DoFnRunner<InputT, OutputT> doFnRunner; + private final WindowingStrategy<?, ?> windowingStrategy; + private final Aggregator<Long, Long> droppedDueToLateness; + private final CleanupTimer cleanupTimer; + private final StateCleaner stateCleaner; + + public StatefulDoFnRunner( + DoFnRunner<InputT, OutputT> doFnRunner, + WindowingStrategy<?, ?> windowingStrategy, + CleanupTimer cleanupTimer, + StateCleaner<W> stateCleaner, + Aggregator<Long, Long> droppedDueToLateness) { + this.doFnRunner = doFnRunner; + this.windowingStrategy = windowingStrategy; + this.cleanupTimer = cleanupTimer; + this.stateCleaner = stateCleaner; + WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn(); + rejectMergingWindowFn(windowFn); + this.droppedDueToLateness = droppedDueToLateness; + } + + private void rejectMergingWindowFn(WindowFn<?, ?> windowFn) { + if (!(windowFn instanceof NonMergingWindowFn)) { + throw new UnsupportedOperationException( + "MergingWindowFn is not supported for stateful DoFns, WindowFn is: " + + windowFn); + } + } + + @Override + public void startBundle() { + doFnRunner.startBundle(); + } + + @Override + public void processElement(WindowedValue<InputT> compressedElem) { + + // StatefulDoFnRunner always observes windows, so we need to explode + for (WindowedValue<InputT> value : compressedElem.explodeWindows()) { + + BoundedWindow window = value.getWindows().iterator().next(); + + if (!dropLateData(window)) { + cleanupTimer.setForWindow(window); + doFnRunner.processElement(value); + } + } + } + + private boolean dropLateData(BoundedWindow window) { + Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); + Instant inputWM = cleanupTimer.currentInputWatermarkTime(); + if (gcTime.isBefore(inputWM)) { + // The element is too late for this window. + droppedDueToLateness.addValue(1L); + WindowTracing.debug( + "StatefulDoFnRunner.processElement/onTimer: Dropping element for window:{} " + + "since too far behind inputWatermark:{}", window, inputWM); + return true; + } else { + return false; + } + } + + @Override + public void onTimer( + String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) { + boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME); + Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); + if (isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp)) { + stateCleaner.clearForWindow(window); + // There should invoke the onWindowExpiration of DoFn + } else { + if (isEventTimer || !dropLateData(window)) { + doFnRunner.onTimer(timerId, window, timestamp, timeDomain); + } + } + } + + @Override + public void finishBundle() { + doFnRunner.finishBundle(); + } + + /** + * A cleaner for deciding when to clean state of window. + * + * <p>A runner might either (a) already know that it always has a timer set + * for the expiration time or (b) not need a timer at all because it is + * a batch runner that discards state when it is done. + */ + public interface CleanupTimer { + + /** + * Return the current, local input watermark timestamp for this computation + * in the {@link TimeDomain#EVENT_TIME} time domain. + */ + Instant currentInputWatermarkTime(); + + /** + * Set the garbage collect time of the window to timer. + */ + void setForWindow(BoundedWindow window); + } + + /** + * A cleaner to clean all states of the window. + */ + public interface StateCleaner<W extends BoundedWindow> { + + void clearForWindow(W window); + } + + /** + * A {@link CleanupTimer} implemented by TimerInternals. + */ + public static class TimeInternalsCleanupTimer implements CleanupTimer { + + private final TimerInternals timerInternals; + private final WindowingStrategy<?, ?> windowingStrategy; + private final Coder<BoundedWindow> windowCoder; + + public TimeInternalsCleanupTimer( + TimerInternals timerInternals, + WindowingStrategy<?, ?> windowingStrategy) { + this.windowingStrategy = windowingStrategy; + WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn(); + windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder(); + this.timerInternals = timerInternals; + } + + @Override + public Instant currentInputWatermarkTime() { + return timerInternals.currentInputWatermarkTime(); + } + + @Override + public void setForWindow(BoundedWindow window) { + Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); + timerInternals.setTimer(StateNamespaces.window(windowCoder, window), + GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME); + } + + } + + /** + * A {@link StateCleaner} implemented by StateInternals. + */ + public static class StateInternalsStateCleaner<W extends BoundedWindow> + implements StateCleaner<W> { + + private final DoFn<?, ?> fn; + private final DoFnSignature signature; + private final StateInternals<?> stateInternals; + private final Coder<W> windowCoder; + + public StateInternalsStateCleaner( + DoFn<?, ?> fn, + StateInternals<?> stateInternals, + Coder<W> windowCoder) { + this.fn = fn; + this.signature = DoFnSignatures.getSignature(fn.getClass()); + this.stateInternals = stateInternals; + this.windowCoder = windowCoder; + } + + @Override + public void clearForWindow(W window) { + for (Map.Entry<String, DoFnSignature.StateDeclaration> entry : + signature.stateDeclarations().entrySet()) { + try { + StateSpec<?, ?> spec = (StateSpec<?, ?>) entry.getValue().field().get(fn); + State state = stateInternals.state(StateNamespaces.window(windowCoder, window), + StateTags.tagForSpec(entry.getKey(), (StateSpec) spec)); + state.clear(); + } catch (IllegalAccessException e) { + throw new RuntimeException(e); + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/6e9f7ad4/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java new file mode 100644 index 0000000..54ac77e --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.when; + +import com.google.common.base.MoreObjects; +import java.util.Collections; +import org.apache.beam.runners.core.BaseExecutionContext.StepContext; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.NullSideInputReader; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.StateSpec; +import org.apache.beam.sdk.util.state.StateSpecs; +import org.apache.beam.sdk.util.state.ValueState; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +/** Tests for {@link StatefulDoFnRunnerTest}. */ +@RunWith(JUnit4.class) +public class StatefulDoFnRunnerTest { + + private static final long WINDOW_SIZE = 10; + private static final long ALLOWED_LATENESS = 1; + + private static final WindowingStrategy<?, ?> WINDOWING_STRATEGY = + WindowingStrategy + .of(FixedWindows.of(Duration.millis(WINDOW_SIZE))) + .withAllowedLateness(Duration.millis(ALLOWED_LATENESS)); + + private static final IntervalWindow WINDOW_1 = + new IntervalWindow(new Instant(0), new Instant(10)); + + private static final IntervalWindow WINDOW_2 = + new IntervalWindow(new Instant(10), new Instant(20)); + + @Mock StepContext mockStepContext; + + private InMemoryLongSumAggregator droppedDueToLateness; + private AggregatorFactory aggregatorFactory; + private InMemoryStateInternals<String> stateInternals; + private InMemoryTimerInternals timerInternals; + + private static StateNamespace windowNamespace(IntervalWindow window) { + return StateNamespaces.<IntervalWindow>window( + (Coder) WINDOWING_STRATEGY.getWindowFn().windowCoder(), window); + } + + @Before + public void setup() { + MockitoAnnotations.initMocks(this); + when(mockStepContext.timerInternals()).thenReturn(timerInternals); + droppedDueToLateness = new InMemoryLongSumAggregator("droppedDueToLateness"); + + aggregatorFactory = new AggregatorFactory() { + @Override + public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( + Class<?> fnClass, ExecutionContext.StepContext stepContext, String aggregatorName, + Combine.CombineFn<InputT, AccumT, OutputT> combine) { + return (Aggregator<InputT, OutputT>) droppedDueToLateness; + } + }; + + stateInternals = new InMemoryStateInternals<>("hello"); + timerInternals = new InMemoryTimerInternals(); + + when(mockStepContext.stateInternals()).thenReturn((StateInternals) stateInternals); + when(mockStepContext.timerInternals()).thenReturn(timerInternals); + } + + @Test + public void testLateDropping() throws Exception { + + timerInternals.advanceInputWatermark(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE)); + timerInternals.advanceOutputWatermark(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE)); + + DoFn<KV<String, Integer>, Integer> fn = new MyDoFn(); + + DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner( + fn, getDoFnRunner(fn), mockStepContext, aggregatorFactory, WINDOWING_STRATEGY); + + runner.startBundle(); + + IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(0L + WINDOW_SIZE)); + Instant timestamp = new Instant(0); + + runner.processElement( + WindowedValue.of(KV.of("hello", 1), timestamp, window, PaneInfo.NO_FIRING)); + assertEquals(1L, droppedDueToLateness.sum); + + runner.onTimer("processTimer", window, timestamp, TimeDomain.PROCESSING_TIME); + assertEquals(2L, droppedDueToLateness.sum); + + runner.onTimer("synchronizedProcessTimer", window, timestamp, + TimeDomain.SYNCHRONIZED_PROCESSING_TIME); + assertEquals(3L, droppedDueToLateness.sum); + + runner.finishBundle(); + } + + @Test + public void testGarbageCollect() throws Exception { + timerInternals.advanceInputWatermark(new Instant(1L)); + + MyDoFn fn = new MyDoFn(); + StateTag<Object, ValueState<Integer>> stateTag = StateTags.tagForSpec(fn.stateId, fn.intState); + + DoFnRunner<KV<String, Integer>, Integer> runner = DoFnRunners.defaultStatefulDoFnRunner( + fn, getDoFnRunner(fn), mockStepContext, aggregatorFactory, WINDOWING_STRATEGY); + + Instant elementTime = new Instant(1); + + // first element, key is hello, WINDOW_1 + runner.processElement( + WindowedValue.of(KV.of("hello", 1), elementTime, WINDOW_1, PaneInfo.NO_FIRING)); + + assertEquals( + 1, (int) stateInternals.state(windowNamespace(WINDOW_1), stateTag).read()); + + // second element, key is hello, WINDOW_2 + runner.processElement( + WindowedValue.of( + KV.of("hello", 1), elementTime.plus(WINDOW_SIZE), WINDOW_2, PaneInfo.NO_FIRING)); + + runner.processElement( + WindowedValue.of( + KV.of("hello", 1), elementTime.plus(WINDOW_SIZE), WINDOW_2, PaneInfo.NO_FIRING)); + + assertEquals( + 2, (int) stateInternals.state(windowNamespace(WINDOW_2), stateTag).read()); + + // advance watermark past end of WINDOW_1 + allowed lateness + advanceInputWatermark( + timerInternals, WINDOW_1.maxTimestamp().plus(ALLOWED_LATENESS + 1), runner); + assertTrue( + stateInternals.isEmptyForTesting( + stateInternals.state(windowNamespace(WINDOW_1), stateTag))); + + assertEquals( + 2, (int) stateInternals.state(windowNamespace(WINDOW_2), stateTag).read()); + + // advance watermark past end of WINDOW_2 + allowed lateness + advanceInputWatermark( + timerInternals, WINDOW_2.maxTimestamp().plus(ALLOWED_LATENESS + 1), runner); + assertTrue( + stateInternals.isEmptyForTesting( + stateInternals.state(windowNamespace(WINDOW_2), stateTag))); + } + + private DoFnRunner<KV<String, Integer>, Integer> getDoFnRunner( + DoFn<KV<String, Integer>, Integer> fn) { + return new SimpleDoFnRunner<>( + null, + fn, + NullSideInputReader.empty(), + null, + null, + Collections.<TupleTag<?>>emptyList(), + mockStepContext, + null, + WINDOWING_STRATEGY); + } + + private static void advanceInputWatermark( + InMemoryTimerInternals timerInternals, + Instant newInputWatermark, + DoFnRunner<?, ?> toTrigger) throws Exception { + timerInternals.advanceInputWatermark(newInputWatermark); + TimerInternals.TimerData timer; + while ((timer = timerInternals.removeNextEventTimer()) != null) { + StateNamespace namespace = timer.getNamespace(); + checkArgument(namespace instanceof StateNamespaces.WindowNamespace); + BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow(); + toTrigger.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain()); + } + } + + private static class MyDoFn extends DoFn<KV<String, Integer>, Integer> { + + public final String stateId = "foo"; + + @StateId(stateId) + public final StateSpec<Object, ValueState<Integer>> intState = + StateSpecs.value(VarIntCoder.of()); + + @ProcessElement + public void processElement( + ProcessContext c, @StateId(stateId) ValueState<Integer> state) { + Integer currentValue = MoreObjects.firstNonNull(state.read(), 0); + state.write(currentValue + 1); + } + }; + + private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> { + private final String name; + private long sum = 0; + + public InMemoryLongSumAggregator(String name) { + this.name = name; + } + + @Override + public void addValue(Long value) { + sum += value; + } + + @Override + public String getName() { + return name; + } + + @Override + public Combine.CombineFn<Long, ?, Long> getCombineFn() { + return Sum.ofLongs(); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/6e9f7ad4/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 29b6fbc..c4622ba 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -291,6 +291,10 @@ public class DoFnOperator<InputT, FnOutputT, OutputT> stepContext, windowingStrategy, ((GroupAlsoByWindowViaWindowSetNewDoFn) doFn).getDroppedDueToLatenessAggregator()); + } else if (keyCoder != null) { + // It is a stateful DoFn + doFnRunner = DoFnRunners.defaultStatefulDoFnRunner( + doFn, doFnRunner, stepContext, aggregatorFactory, windowingStrategy); } pushbackDoFnRunner =