Repository: beam
Updated Branches:
  refs/heads/master d66029caf -> 79b1395c2


[BEAM-1517] Garbage collect user state in Flink Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6e9f7ad4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6e9f7ad4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6e9f7ad4

Branch: refs/heads/master
Commit: 6e9f7ad46bcb341d37ac5d7804465dd82ef3b56e
Parents: d66029c
Author: JingsongLi <lzljs3620...@aliyun.com>
Authored: Tue Feb 28 16:08:38 2017 +0800
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Wed Mar 1 17:31:36 2017 +0100

----------------------------------------------------------------------
 .../apache/beam/runners/core/DoFnRunners.java   |  39 +++
 .../beam/runners/core/StatefulDoFnRunner.java   | 233 +++++++++++++++++
 .../runners/core/StatefulDoFnRunnerTest.java    | 255 +++++++++++++++++++
 .../wrappers/streaming/DoFnOperator.java        |   4 +
 4 files changed, 531 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/6e9f7ad4/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
index f3972ae..9455eea 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java
@@ -19,9 +19,15 @@ package org.apache.beam.runners.core;
 
 import java.util.List;
 import org.apache.beam.runners.core.ExecutionContext.StepContext;
+import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer;
+import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner;
+import 
org.apache.beam.runners.core.StatefulDoFnRunner.StateInternalsStateCleaner;
+import 
org.apache.beam.runners.core.StatefulDoFnRunner.TimeInternalsCleanupTimer;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -116,4 +122,37 @@ public class DoFnRunners {
         stepContext.timerInternals(),
         droppedDueToLatenessAggregator);
   }
+
+  /**
+   * Returns an implementation of {@link DoFnRunner} that handles
+   * late data dropping and garbage collection for stateful {@link DoFn DoFns}.
+   *
+   * <p>It registers a timer by TimeInternals, and clean all states by 
StateInternals.
+   */
+  public static <InputT, OutputT, W extends BoundedWindow>
+      DoFnRunner<InputT, OutputT> defaultStatefulDoFnRunner(
+          DoFn<InputT, OutputT> fn,
+          DoFnRunner<InputT, OutputT> doFnRunner,
+          StepContext stepContext,
+          AggregatorFactory aggregatorFactory,
+          WindowingStrategy<?, ?> windowingStrategy) {
+    Aggregator<Long, Long> droppedDueToLateness = 
aggregatorFactory.createAggregatorForDoFn(
+        fn.getClass(), stepContext, 
StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER,
+        Sum.ofLongs());
+
+    CleanupTimer cleanupTimer =
+        new TimeInternalsCleanupTimer(stepContext.timerInternals(), 
windowingStrategy);
+
+    Coder<W> windowCoder = (Coder<W>) 
windowingStrategy.getWindowFn().windowCoder();
+    StateCleaner<W> stateCleaner =
+        new StateInternalsStateCleaner<>(fn, stepContext.stateInternals(), 
windowCoder);
+
+    return new StatefulDoFnRunner<>(
+        doFnRunner,
+        windowingStrategy,
+        cleanupTimer,
+        stateCleaner,
+        droppedDueToLateness);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6e9f7ad4/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
new file mode 100644
index 0000000..154d8bc
--- /dev/null
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import java.util.Map;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
+import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.joda.time.Instant;
+
+/**
+ * A customized {@link DoFnRunner} that handles late data dropping and garbage 
collection for
+ * stateful {@link DoFn DoFns}. It registers a GC timer in {@link 
#processElement(WindowedValue)}
+ * and does cleanup in {@link #onTimer(String, BoundedWindow, Instant, 
TimeDomain)}
+ *
+ * @param <InputT> the type of the {@link DoFn} (main) input elements
+ * @param <OutputT> the type of the {@link DoFn} (main) output elements
+ */
+public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
+    implements DoFnRunner<InputT, OutputT> {
+
+  public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
+  public static final String DROPPED_DUE_TO_LATENESS_COUNTER = 
"StatefulParDoDropped";
+
+  private final DoFnRunner<InputT, OutputT> doFnRunner;
+  private final WindowingStrategy<?, ?> windowingStrategy;
+  private final Aggregator<Long, Long> droppedDueToLateness;
+  private final CleanupTimer cleanupTimer;
+  private final StateCleaner stateCleaner;
+
+  public StatefulDoFnRunner(
+      DoFnRunner<InputT, OutputT> doFnRunner,
+      WindowingStrategy<?, ?> windowingStrategy,
+      CleanupTimer cleanupTimer,
+      StateCleaner<W> stateCleaner,
+      Aggregator<Long, Long> droppedDueToLateness) {
+    this.doFnRunner = doFnRunner;
+    this.windowingStrategy = windowingStrategy;
+    this.cleanupTimer = cleanupTimer;
+    this.stateCleaner = stateCleaner;
+    WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
+    rejectMergingWindowFn(windowFn);
+    this.droppedDueToLateness = droppedDueToLateness;
+  }
+
+  private void rejectMergingWindowFn(WindowFn<?, ?> windowFn) {
+    if (!(windowFn instanceof NonMergingWindowFn)) {
+      throw new UnsupportedOperationException(
+          "MergingWindowFn is not supported for stateful DoFns, WindowFn is: "
+              + windowFn);
+    }
+  }
+
+  @Override
+  public void startBundle() {
+    doFnRunner.startBundle();
+  }
+
+  @Override
+  public void processElement(WindowedValue<InputT> compressedElem) {
+
+    // StatefulDoFnRunner always observes windows, so we need to explode
+    for (WindowedValue<InputT> value : compressedElem.explodeWindows()) {
+
+      BoundedWindow window = value.getWindows().iterator().next();
+
+      if (!dropLateData(window)) {
+        cleanupTimer.setForWindow(window);
+        doFnRunner.processElement(value);
+      }
+    }
+  }
+
+  private boolean dropLateData(BoundedWindow window) {
+    Instant gcTime = 
window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+    Instant inputWM = cleanupTimer.currentInputWatermarkTime();
+    if (gcTime.isBefore(inputWM)) {
+      // The element is too late for this window.
+      droppedDueToLateness.addValue(1L);
+      WindowTracing.debug(
+          "StatefulDoFnRunner.processElement/onTimer: Dropping element for 
window:{} "
+              + "since too far behind inputWatermark:{}", window, inputWM);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public void onTimer(
+      String timerId, BoundedWindow window, Instant timestamp, TimeDomain 
timeDomain) {
+    boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
+    Instant gcTime = 
window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+    if (isEventTimer && GC_TIMER_ID.equals(timerId) && 
gcTime.equals(timestamp)) {
+      stateCleaner.clearForWindow(window);
+      // There should invoke the onWindowExpiration of DoFn
+    } else {
+      if (isEventTimer || !dropLateData(window)) {
+        doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
+      }
+    }
+  }
+
+  @Override
+  public void finishBundle() {
+    doFnRunner.finishBundle();
+  }
+
+  /**
+   * A cleaner for deciding when to clean state of window.
+   *
+   * <p>A runner might either (a) already know that it always has a timer set
+   * for the expiration time or (b) not need a timer at all because it is
+   * a batch runner that discards state when it is done.
+   */
+  public interface CleanupTimer {
+
+    /**
+     * Return the current, local input watermark timestamp for this computation
+     * in the {@link TimeDomain#EVENT_TIME} time domain.
+     */
+    Instant currentInputWatermarkTime();
+
+    /**
+     * Set the garbage collect time of the window to timer.
+     */
+    void setForWindow(BoundedWindow window);
+  }
+
+  /**
+   * A cleaner to clean all states of the window.
+   */
+  public interface StateCleaner<W extends BoundedWindow> {
+
+    void clearForWindow(W window);
+  }
+
+  /**
+   * A {@link CleanupTimer} implemented by TimerInternals.
+   */
+  public static class TimeInternalsCleanupTimer implements CleanupTimer {
+
+    private final TimerInternals timerInternals;
+    private final WindowingStrategy<?, ?> windowingStrategy;
+    private final Coder<BoundedWindow> windowCoder;
+
+    public TimeInternalsCleanupTimer(
+        TimerInternals timerInternals,
+        WindowingStrategy<?, ?> windowingStrategy) {
+      this.windowingStrategy = windowingStrategy;
+      WindowFn<?, ?> windowFn = windowingStrategy.getWindowFn();
+      windowCoder = (Coder<BoundedWindow>) windowFn.windowCoder();
+      this.timerInternals = timerInternals;
+    }
+
+    @Override
+    public Instant currentInputWatermarkTime() {
+      return timerInternals.currentInputWatermarkTime();
+    }
+
+    @Override
+    public void setForWindow(BoundedWindow window) {
+      Instant gcTime = 
window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+      timerInternals.setTimer(StateNamespaces.window(windowCoder, window),
+          GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
+    }
+
+  }
+
+  /**
+   * A {@link StateCleaner} implemented by StateInternals.
+   */
+  public static class StateInternalsStateCleaner<W extends BoundedWindow>
+      implements StateCleaner<W> {
+
+    private final DoFn<?, ?> fn;
+    private final DoFnSignature signature;
+    private final StateInternals<?> stateInternals;
+    private final Coder<W> windowCoder;
+
+    public StateInternalsStateCleaner(
+        DoFn<?, ?> fn,
+        StateInternals<?> stateInternals,
+        Coder<W> windowCoder) {
+      this.fn = fn;
+      this.signature = DoFnSignatures.getSignature(fn.getClass());
+      this.stateInternals = stateInternals;
+      this.windowCoder = windowCoder;
+    }
+
+    @Override
+    public void clearForWindow(W window) {
+      for (Map.Entry<String, DoFnSignature.StateDeclaration> entry :
+          signature.stateDeclarations().entrySet()) {
+        try {
+          StateSpec<?, ?> spec = (StateSpec<?, ?>) 
entry.getValue().field().get(fn);
+          State state = 
stateInternals.state(StateNamespaces.window(windowCoder, window),
+              StateTags.tagForSpec(entry.getKey(), (StateSpec) spec));
+          state.clear();
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6e9f7ad4/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
new file mode 100644
index 0000000..54ac77e
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/StatefulDoFnRunnerTest.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+import com.google.common.base.MoreObjects;
+import java.util.Collections;
+import org.apache.beam.runners.core.BaseExecutionContext.StepContext;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.NullSideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateSpec;
+import org.apache.beam.sdk.util.state.StateSpecs;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/** Tests for {@link StatefulDoFnRunnerTest}. */
+@RunWith(JUnit4.class)
+public class StatefulDoFnRunnerTest {
+
+  private static final long WINDOW_SIZE = 10;
+  private static final long ALLOWED_LATENESS = 1;
+
+  private static final WindowingStrategy<?, ?> WINDOWING_STRATEGY =
+      WindowingStrategy
+          .of(FixedWindows.of(Duration.millis(WINDOW_SIZE)))
+          .withAllowedLateness(Duration.millis(ALLOWED_LATENESS));
+
+  private static final IntervalWindow WINDOW_1 =
+      new IntervalWindow(new Instant(0), new Instant(10));
+
+  private static final IntervalWindow WINDOW_2 =
+      new IntervalWindow(new Instant(10), new Instant(20));
+
+  @Mock StepContext mockStepContext;
+
+  private InMemoryLongSumAggregator droppedDueToLateness;
+  private AggregatorFactory aggregatorFactory;
+  private InMemoryStateInternals<String> stateInternals;
+  private InMemoryTimerInternals timerInternals;
+
+  private static StateNamespace windowNamespace(IntervalWindow window) {
+    return StateNamespaces.<IntervalWindow>window(
+        (Coder) WINDOWING_STRATEGY.getWindowFn().windowCoder(), window);
+  }
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    when(mockStepContext.timerInternals()).thenReturn(timerInternals);
+    droppedDueToLateness = new 
InMemoryLongSumAggregator("droppedDueToLateness");
+
+    aggregatorFactory = new AggregatorFactory() {
+      @Override
+      public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> 
createAggregatorForDoFn(
+          Class<?> fnClass, ExecutionContext.StepContext stepContext, String 
aggregatorName,
+          Combine.CombineFn<InputT, AccumT, OutputT> combine) {
+        return (Aggregator<InputT, OutputT>) droppedDueToLateness;
+      }
+    };
+
+    stateInternals = new InMemoryStateInternals<>("hello");
+    timerInternals = new InMemoryTimerInternals();
+
+    when(mockStepContext.stateInternals()).thenReturn((StateInternals) 
stateInternals);
+    when(mockStepContext.timerInternals()).thenReturn(timerInternals);
+  }
+
+  @Test
+  public void testLateDropping() throws Exception {
+
+    timerInternals.advanceInputWatermark(new 
Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    timerInternals.advanceOutputWatermark(new 
Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
+
+    DoFn<KV<String, Integer>, Integer> fn = new MyDoFn();
+
+    DoFnRunner<KV<String, Integer>, Integer> runner = 
DoFnRunners.defaultStatefulDoFnRunner(
+        fn, getDoFnRunner(fn), mockStepContext, aggregatorFactory, 
WINDOWING_STRATEGY);
+
+    runner.startBundle();
+
+    IntervalWindow window = new IntervalWindow(new Instant(0), new Instant(0L 
+ WINDOW_SIZE));
+    Instant timestamp = new Instant(0);
+
+    runner.processElement(
+        WindowedValue.of(KV.of("hello", 1), timestamp, window, 
PaneInfo.NO_FIRING));
+    assertEquals(1L, droppedDueToLateness.sum);
+
+    runner.onTimer("processTimer", window, timestamp, 
TimeDomain.PROCESSING_TIME);
+    assertEquals(2L, droppedDueToLateness.sum);
+
+    runner.onTimer("synchronizedProcessTimer", window, timestamp,
+        TimeDomain.SYNCHRONIZED_PROCESSING_TIME);
+    assertEquals(3L, droppedDueToLateness.sum);
+
+    runner.finishBundle();
+  }
+
+  @Test
+  public void testGarbageCollect() throws Exception {
+    timerInternals.advanceInputWatermark(new Instant(1L));
+
+    MyDoFn fn = new MyDoFn();
+    StateTag<Object, ValueState<Integer>> stateTag = 
StateTags.tagForSpec(fn.stateId, fn.intState);
+
+    DoFnRunner<KV<String, Integer>, Integer> runner = 
DoFnRunners.defaultStatefulDoFnRunner(
+        fn, getDoFnRunner(fn), mockStepContext, aggregatorFactory, 
WINDOWING_STRATEGY);
+
+    Instant elementTime = new Instant(1);
+
+    // first element, key is hello, WINDOW_1
+    runner.processElement(
+        WindowedValue.of(KV.of("hello", 1), elementTime, WINDOW_1, 
PaneInfo.NO_FIRING));
+
+    assertEquals(
+        1, (int) stateInternals.state(windowNamespace(WINDOW_1), 
stateTag).read());
+
+    // second element, key is hello, WINDOW_2
+    runner.processElement(
+        WindowedValue.of(
+            KV.of("hello", 1), elementTime.plus(WINDOW_SIZE), WINDOW_2, 
PaneInfo.NO_FIRING));
+
+    runner.processElement(
+        WindowedValue.of(
+            KV.of("hello", 1), elementTime.plus(WINDOW_SIZE), WINDOW_2, 
PaneInfo.NO_FIRING));
+
+    assertEquals(
+        2, (int) stateInternals.state(windowNamespace(WINDOW_2), 
stateTag).read());
+
+    // advance watermark past end of WINDOW_1 + allowed lateness
+    advanceInputWatermark(
+        timerInternals, WINDOW_1.maxTimestamp().plus(ALLOWED_LATENESS + 1), 
runner);
+    assertTrue(
+        stateInternals.isEmptyForTesting(
+            stateInternals.state(windowNamespace(WINDOW_1), stateTag)));
+
+    assertEquals(
+        2, (int) stateInternals.state(windowNamespace(WINDOW_2), 
stateTag).read());
+
+    // advance watermark past end of WINDOW_2 + allowed lateness
+    advanceInputWatermark(
+        timerInternals, WINDOW_2.maxTimestamp().plus(ALLOWED_LATENESS + 1), 
runner);
+    assertTrue(
+        stateInternals.isEmptyForTesting(
+            stateInternals.state(windowNamespace(WINDOW_2), stateTag)));
+  }
+
+  private DoFnRunner<KV<String, Integer>, Integer> getDoFnRunner(
+      DoFn<KV<String, Integer>, Integer> fn) {
+    return new SimpleDoFnRunner<>(
+        null,
+        fn,
+        NullSideInputReader.empty(),
+        null,
+        null,
+        Collections.<TupleTag<?>>emptyList(),
+        mockStepContext,
+        null,
+        WINDOWING_STRATEGY);
+  }
+
+  private static void advanceInputWatermark(
+      InMemoryTimerInternals timerInternals,
+      Instant newInputWatermark,
+      DoFnRunner<?, ?> toTrigger) throws Exception {
+    timerInternals.advanceInputWatermark(newInputWatermark);
+    TimerInternals.TimerData timer;
+    while ((timer = timerInternals.removeNextEventTimer()) != null) {
+      StateNamespace namespace = timer.getNamespace();
+      checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+      BoundedWindow window = ((StateNamespaces.WindowNamespace) 
namespace).getWindow();
+      toTrigger.onTimer(timer.getTimerId(), window, timer.getTimestamp(), 
timer.getDomain());
+    }
+  }
+
+  private static class MyDoFn extends DoFn<KV<String, Integer>, Integer> {
+
+    public final String stateId = "foo";
+
+    @StateId(stateId)
+    public final StateSpec<Object, ValueState<Integer>> intState =
+        StateSpecs.value(VarIntCoder.of());
+
+    @ProcessElement
+    public void processElement(
+        ProcessContext c, @StateId(stateId) ValueState<Integer> state) {
+      Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
+      state.write(currentValue + 1);
+    }
+  };
+
+  private static class InMemoryLongSumAggregator implements Aggregator<Long, 
Long> {
+    private final String name;
+    private long sum = 0;
+
+    public InMemoryLongSumAggregator(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public void addValue(Long value) {
+      sum += value;
+    }
+
+    @Override
+    public String getName() {
+      return name;
+    }
+
+    @Override
+    public Combine.CombineFn<Long, ?, Long> getCombineFn() {
+      return Sum.ofLongs();
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/6e9f7ad4/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 29b6fbc..c4622ba 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -291,6 +291,10 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
           stepContext,
           windowingStrategy,
           ((GroupAlsoByWindowViaWindowSetNewDoFn) 
doFn).getDroppedDueToLatenessAggregator());
+    } else if (keyCoder != null) {
+      // It is a stateful DoFn
+      doFnRunner = DoFnRunners.defaultStatefulDoFnRunner(
+          doFn, doFnRunner, stepContext, aggregatorFactory, windowingStrategy);
     }
 
     pushbackDoFnRunner =

Reply via email to