[BEAM-1393/1394/1445] Update to Flink 1.2.0 and fix resulting problems

We now use the Flink InternalTimerService for all our timer needs and we
use Flink Broadcast state to store side inputs.

Changing those was both necessary and easier to to in one commit while
updating to Flink 1.2.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7d32b93e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7d32b93e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7d32b93e

Branch: refs/heads/master
Commit: 7d32b93e337b21286c5269956b45f76c623f8518
Parents: f03f6ac
Author: JingsongLi <lzljs3620...@aliyun.com>
Authored: Tue Feb 7 16:11:12 2017 +0800
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Thu Feb 16 12:13:33 2017 +0100

----------------------------------------------------------------------
 runners/flink/pom.xml                           |    2 +-
 runners/flink/runner/pom.xml                    |    6 +
 .../flink/DefaultParallelismFactory.java        |    3 +-
 .../FlinkStreamingTransformTranslators.java     |   20 +-
 .../FlinkStreamingTranslationContext.java       |    8 +
 .../wrappers/streaming/DoFnOperator.java        |  408 ++++---
 .../wrappers/streaming/FlinkStateInternals.java | 1055 ------------------
 .../wrappers/streaming/WindowDoFnOperator.java  |  389 +------
 .../streaming/io/BoundedSourceWrapper.java      |    5 -
 .../streaming/io/UnboundedSourceWrapper.java    |   23 +-
 .../state/FlinkBroadcastStateInternals.java     |  865 ++++++++++++++
 .../state/FlinkKeyGroupStateInternals.java      |  487 ++++++++
 .../state/FlinkSplitStateInternals.java         |  260 +++++
 .../streaming/state/FlinkStateInternals.java    | 1053 +++++++++++++++++
 .../state/KeyGroupCheckpointedOperator.java     |   35 +
 .../state/KeyGroupRestoringOperator.java        |   32 +
 .../wrappers/streaming/state/package-info.java  |   22 +
 .../beam/runners/flink/PipelineOptionsTest.java |   25 +-
 .../flink/streaming/DoFnOperatorTest.java       |  109 +-
 .../FlinkBroadcastStateInternalsTest.java       |  245 ++++
 .../FlinkKeyGroupStateInternalsTest.java        |  262 +++++
 .../streaming/FlinkSplitStateInternalsTest.java |  101 ++
 .../streaming/FlinkStateInternalsTest.java      |   27 +-
 .../streaming/UnboundedSourceWrapperTest.java   |   16 +
 24 files changed, 3814 insertions(+), 1644 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7d32b93e/runners/flink/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml
index 6f4236e..0030f61 100644
--- a/runners/flink/pom.xml
+++ b/runners/flink/pom.xml
@@ -39,7 +39,7 @@
   <properties>
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
-    <flink.version>1.1.2</flink.version>
+    <flink.version>1.2.0</flink.version>
   </properties>
 
   <repositories>

http://git-wip-us.apache.org/repos/asf/beam/blob/7d32b93e/runners/flink/runner/pom.xml
----------------------------------------------------------------------
diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml
index f254d9a..fbe2686 100644
--- a/runners/flink/runner/pom.xml
+++ b/runners/flink/runner/pom.xml
@@ -220,6 +220,12 @@
       <artifactId>flink-test-utils_2.10</artifactId>
       <version>${flink.version}</version>
       <scope>test</scope>
+        <exclusions>
+            <exclusion>
+                <artifactId>apacheds-jdbm1</artifactId>
+                <groupId>org.apache.directory.jdbm</groupId>
+            </exclusion>
+        </exclusions>
     </dependency>
 
     <!-- Optional Pipeline Registration -->

http://git-wip-us.apache.org/repos/asf/beam/blob/7d32b93e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
index 2fe4569..b745f0b 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java
@@ -33,6 +33,7 @@ import org.apache.flink.configuration.GlobalConfiguration;
 public class DefaultParallelismFactory implements DefaultValueFactory<Integer> 
{
   @Override
   public Integer create(PipelineOptions options) {
-    return 
GlobalConfiguration.getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 1);
+    return GlobalConfiguration.loadConfiguration()
+        .getInteger(ConfigConstants.DEFAULT_PARALLELISM_KEY, 1);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7d32b93e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index b9b5059..2131729 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -347,8 +347,7 @@ public class FlinkStreamingTransformTranslators {
       @SuppressWarnings("unchecked")
       PCollection<InputT> inputPCollection = (PCollection<InputT>) 
context.getInput(transform);
 
-      TypeInformation<WindowedValue<InputT>> inputTypeInfo =
-          context.getTypeInfo(inputPCollection);
+      Coder<WindowedValue<InputT>> inputCoder = 
context.getCoder(inputPCollection);
 
       DataStream<WindowedValue<InputT>> inputDataStream =
           context.getInputDataStream(context.getInput(transform));
@@ -368,7 +367,7 @@ public class FlinkStreamingTransformTranslators {
         DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
             new DoFnOperator<>(
                 transform.getFn(),
-                inputTypeInfo,
+                inputCoder,
                 new TupleTag<OutputT>("main output"),
                 Collections.<TupleTag<?>>emptyList(),
                 new 
DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OutputT>>(),
@@ -389,7 +388,7 @@ public class FlinkStreamingTransformTranslators {
         DoFnOperator<InputT, OutputT, WindowedValue<OutputT>> doFnOperator =
             new DoFnOperator<>(
                 transform.getFn(),
-                inputTypeInfo,
+                inputCoder,
                 new TupleTag<OutputT>("main output"),
                 Collections.<TupleTag<?>>emptyList(),
                 new 
DoFnOperator.DefaultOutputManagerFactory<WindowedValue<OutputT>>(),
@@ -538,8 +537,7 @@ public class FlinkStreamingTransformTranslators {
       @SuppressWarnings("unchecked")
       PCollection<InputT> inputPCollection = (PCollection<InputT>) 
context.getInput(transform);
 
-      TypeInformation<WindowedValue<InputT>> inputTypeInfo =
-          context.getTypeInfo(inputPCollection);
+      Coder<WindowedValue<InputT>> inputCoder = 
context.getCoder(inputPCollection);
 
       DataStream<WindowedValue<InputT>> inputDataStream =
           context.getInputDataStream(context.getInput(transform));
@@ -559,7 +557,7 @@ public class FlinkStreamingTransformTranslators {
         DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
             new DoFnOperator<>(
                 transform.getFn(),
-                inputTypeInfo,
+                inputCoder,
                 transform.getMainOutputTag(),
                 transform.getSideOutputTags().getAll(),
                 new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
@@ -584,7 +582,7 @@ public class FlinkStreamingTransformTranslators {
         DoFnOperator<InputT, OutputT, RawUnionValue> doFnOperator =
             new DoFnOperator<>(
                 transform.getFn(),
-                inputTypeInfo,
+                inputCoder,
                 transform.getMainOutputTag(),
                 transform.getSideOutputTags().getAll(),
                 new DoFnOperator.MultiOutputOutputManagerFactory(tagsToLabels),
@@ -813,7 +811,7 @@ public class FlinkStreamingTransformTranslators {
       WindowDoFnOperator<K, InputT, Iterable<InputT>> doFnOperator =
           new WindowDoFnOperator<>(
               reduceFn,
-              (TypeInformation) workItemTypeInfo,
+              (Coder) windowedWorkItemCoder,
               new TupleTag<KV<K, Iterable<InputT>>>("main output"),
               Collections.<TupleTag<?>>emptyList(),
               outputManagerFactory,
@@ -913,7 +911,7 @@ public class FlinkStreamingTransformTranslators {
         WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
             new WindowDoFnOperator<>(
                 reduceFn,
-                (TypeInformation) workItemTypeInfo,
+                (Coder) windowedWorkItemCoder,
                 new TupleTag<KV<K, OutputT>>("main output"),
                 Collections.<TupleTag<?>>emptyList(),
                 new 
DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),
@@ -938,7 +936,7 @@ public class FlinkStreamingTransformTranslators {
         WindowDoFnOperator<K, InputT, OutputT> doFnOperator =
             new WindowDoFnOperator<>(
                 reduceFn,
-                (TypeInformation) workItemTypeInfo,
+                (Coder) windowedWorkItemCoder,
                 new TupleTag<KV<K, OutputT>>("main output"),
                 Collections.<TupleTag<?>>emptyList(),
                 new 
DoFnOperator.DefaultOutputManagerFactory<WindowedValue<KV<K, OutputT>>>(),

http://git-wip-us.apache.org/repos/asf/beam/blob/7d32b93e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
index 7932f68..03698d5 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java
@@ -89,6 +89,14 @@ public class FlinkStreamingTranslationContext {
     this.currentTransform = currentTransform;
   }
 
+  public <T> Coder<WindowedValue<T>> getCoder(PCollection<T> collection) {
+    Coder<T> valueCoder = collection.getCoder();
+
+    return WindowedValue.getFullCoder(
+        valueCoder,
+        collection.getWindowingStrategy().getWindowFn().windowCoder());
+  }
+
   @SuppressWarnings("unchecked")
   public <T> TypeInformation<WindowedValue<T>> getTypeInfo(PCollection<T> 
collection) {
     Coder<T> valueCoder = collection.getCoder();

http://git-wip-us.apache.org/repos/asf/beam/blob/7d32b93e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index c66f026..8a3dad2 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -17,18 +17,20 @@
  */
 package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
-import static com.google.common.base.Preconditions.checkState;
+import static org.apache.flink.util.Preconditions.checkArgument;
 
+import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.apache.beam.runners.core.AggregatorFactory;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
@@ -37,11 +39,21 @@ import 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn;
 import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
 import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateNamespaces;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.core.StateTags;
 import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
 import 
org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkBroadcastStateInternals;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkKeyGroupStateInternals;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkSplitStateInternals;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.KeyGroupCheckpointedOperator;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
@@ -50,34 +62,33 @@ import org.apache.beam.sdk.transforms.join.RawUnionValue;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
 import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.BagState;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ReducingState;
-import org.apache.flink.api.common.state.ReducingStateDescriptor;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.common.typeutils.base.VoidSerializer;
-import org.apache.flink.api.java.typeutils.GenericTypeInfo;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.apache.flink.runtime.state.KvStateSnapshot;
-import org.apache.flink.runtime.state.StateHandle;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
+import org.apache.flink.runtime.state.KeyGroupsList;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.HeapInternalTimerService;
+import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
+import org.joda.time.Instant;
 
 /**
  * Flink operator for executing {@link DoFn DoFns}.
@@ -90,7 +101,8 @@ import 
org.apache.flink.streaming.runtime.tasks.StreamTaskState;
 public class DoFnOperator<InputT, FnOutputT, OutputT>
     extends AbstractStreamOperator<OutputT>
     implements OneInputStreamOperator<WindowedValue<InputT>, OutputT>,
-      TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT> {
+      TwoInputStreamOperator<WindowedValue<InputT>, RawUnionValue, OutputT>,
+    KeyGroupCheckpointedOperator, Triggerable<Object, TimerData> {
 
   protected DoFn<InputT, FnOutputT> doFn;
 
@@ -120,21 +132,27 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   protected transient long currentOutputWatermark;
 
-  private transient AbstractStateBackend sideInputStateBackend;
+  private transient StateTag<Object, BagState<WindowedValue<InputT>>> 
pushedBackTag;
+
+  protected transient FlinkStateInternals<?> stateInternals;
 
-  private final ReducingStateDescriptor<Long> pushedBackWatermarkDescriptor;
+  private Coder<WindowedValue<InputT>> inputCoder;
 
-  private final ListStateDescriptor<WindowedValue<InputT>> 
pushedBackDescriptor;
+  private final Coder<?> keyCoder;
 
-  private transient Map<String, KvStateSnapshot<?, ?, ?, ?, ?>> 
restoredSideInputState;
+  private final TimerInternals.TimerDataCoder timerCoder;
 
-  protected transient FlinkStateInternals<?> stateInternals;
+  protected transient HeapInternalTimerService<?, TimerInternals.TimerData> 
timerService;
 
-  private final Coder<?> keyCoder;
+  protected transient FlinkTimerInternals timerInternals;
+
+  private transient StateInternals<?> pushbackStateInternals;
+
+  private transient Optional<Long> pushedBackWatermark;
 
   public DoFnOperator(
       DoFn<InputT, FnOutputT> doFn,
-      TypeInformation<WindowedValue<InputT>> inputType,
+      Coder<WindowedValue<InputT>> inputCoder,
       TupleTag<FnOutputT> mainOutputTag,
       List<TupleTag<?>> sideOutputTags,
       OutputManagerFactory<OutputT> outputManagerFactory,
@@ -144,6 +162,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       PipelineOptions options,
       Coder<?> keyCoder) {
     this.doFn = doFn;
+    this.inputCoder = inputCoder;
     this.mainOutputTag = mainOutputTag;
     this.sideOutputTags = sideOutputTags;
     this.sideInputTagMapping = sideInputTagMapping;
@@ -152,18 +171,12 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     this.windowingStrategy = windowingStrategy;
     this.outputManagerFactory = outputManagerFactory;
 
-    this.pushedBackWatermarkDescriptor =
-        new ReducingStateDescriptor<>(
-            "pushed-back-elements-watermark-hold",
-            new LongMinReducer(),
-            LongSerializer.INSTANCE);
-
-    this.pushedBackDescriptor =
-        new ListStateDescriptor<>("pushed-back-values", inputType);
-
     setChainingStrategy(ChainingStrategy.ALWAYS);
 
     this.keyCoder = keyCoder;
+
+    this.timerCoder =
+        
TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());
   }
 
   protected ExecutionContext.StepContext createStepContext() {
@@ -181,7 +194,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     super.open();
 
     currentInputWatermark = Long.MIN_VALUE;
-    currentOutputWatermark = currentInputWatermark;
+    currentOutputWatermark = Long.MIN_VALUE;
 
     AggregatorFactory aggregatorFactory = new AggregatorFactory() {
       @Override
@@ -207,40 +220,47 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     sideInputReader = NullSideInputReader.of(sideInputs);
 
     if (!sideInputs.isEmpty()) {
-      String operatorIdentifier =
-          this.getClass().getSimpleName() + "_"
-              + getRuntimeContext().getIndexOfThisSubtask() + "_sideInput";
-
-      sideInputStateBackend = this
-          .getContainingTask()
-          .createStateBackend(operatorIdentifier,
-              new GenericTypeInfo<>(ByteBuffer.class).createSerializer(new 
ExecutionConfig()));
-
-      checkState(sideInputStateBackend != null, "Side input state backend 
cannot be null");
-
-      if (restoredSideInputState != null) {
-        @SuppressWarnings("unchecked,rawtypes")
-        HashMap<String, KvStateSnapshot> castRestored = (HashMap) 
restoredSideInputState;
-        sideInputStateBackend.injectKeyValueStateSnapshots(castRestored);
-        restoredSideInputState = null;
-      }
 
-      sideInputStateBackend.setCurrentKey(
-          ByteBuffer.wrap(CoderUtils.encodeToByteArray(VoidCoder.of(), null)));
+      pushedBackTag = StateTags.bag("pushed-back-values", inputCoder);
 
-      StateInternals<Void> sideInputStateInternals =
-          new FlinkStateInternals<>(sideInputStateBackend, VoidCoder.of());
+      FlinkBroadcastStateInternals sideInputStateInternals =
+          new FlinkBroadcastStateInternals<>(
+              getContainingTask().getIndexInSubtaskGroup(), 
getOperatorStateBackend());
 
       sideInputHandler = new SideInputHandler(sideInputs, 
sideInputStateInternals);
       sideInputReader = sideInputHandler;
+
+      // maybe init by initializeState
+      if (pushbackStateInternals == null) {
+        if (keyCoder != null) {
+          pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
+              getKeyedStateBackend());
+        } else {
+          pushbackStateInternals =
+              new FlinkSplitStateInternals<Object>(getOperatorStateBackend());
+        }
+      }
+
+      pushedBackWatermark = Optional.absent();
+
     }
 
     outputManager = outputManagerFactory.create(output);
 
+    // StatefulPardo or WindowDoFn
     if (keyCoder != null) {
-      stateInternals = new FlinkStateInternals<>(getStateBackend(), keyCoder);
+      stateInternals = new FlinkStateInternals<>((KeyedStateBackend) 
getKeyedStateBackend(),
+          keyCoder);
+
+      timerService = (HeapInternalTimerService<?, TimerInternals.TimerData>)
+          getInternalTimerService("beam-timer", new 
CoderTypeSerializer<>(timerCoder), this);
+
+      timerInternals = new FlinkTimerInternals();
+
     }
 
+    // WindowDoFnOperator need use state and timer to get DoFn.
+    // So must wait StateInternals and TimerInternals ready.
     this.doFn = getDoFn();
     doFnInvoker = DoFnInvokers.invokerFor(doFn);
 
@@ -290,16 +310,29 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     }
 
     try {
-      Long result = sideInputStateBackend.getPartitionedState(
-          null,
-          VoidSerializer.INSTANCE,
-          pushedBackWatermarkDescriptor).get();
-      return result != null ? result : Long.MAX_VALUE;
+      checkInitPushedBackWatermark();
+      return pushedBackWatermark.get();
     } catch (Exception e) {
       throw new RuntimeException("Error retrieving pushed back watermark 
state.", e);
     }
   }
 
+  private void checkInitPushedBackWatermark() {
+    // init and restore from pushedBack state.
+    // Not done in initializeState, because OperatorState is not ready.
+    if (!pushedBackWatermark.isPresent()) {
+
+      BagState<WindowedValue<InputT>> pushedBack =
+          pushbackStateInternals.state(StateNamespaces.global(), 
pushedBackTag);
+
+      long min = TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE);
+      for (WindowedValue<InputT> value : pushedBack.read()) {
+        min = Math.min(min, value.getTimestamp().getMillis());
+      }
+      setPushedBackWatermark(min);
+    }
+  }
+
   @Override
   public final void processElement(
       StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
@@ -308,6 +341,10 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     pushbackDoFnRunner.finishBundle();
   }
 
+  private void setPushedBackWatermark(long watermark) {
+    pushedBackWatermark = Optional.fromNullable(watermark);
+  }
+
   @Override
   public final void processElement1(
       StreamRecord<WindowedValue<InputT>> streamRecord) throws Exception {
@@ -315,22 +352,17 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     Iterable<WindowedValue<InputT>> justPushedBack =
         
pushbackDoFnRunner.processElementInReadyWindows(streamRecord.getValue());
 
-    ListState<WindowedValue<InputT>> pushedBack =
-        sideInputStateBackend.getPartitionedState(
-            null,
-            VoidSerializer.INSTANCE,
-            pushedBackDescriptor);
+    BagState<WindowedValue<InputT>> pushedBack =
+        pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
 
-    ReducingState<Long> pushedBackWatermark =
-        sideInputStateBackend.getPartitionedState(
-            null,
-            VoidSerializer.INSTANCE,
-            pushedBackWatermarkDescriptor);
+    checkInitPushedBackWatermark();
 
+    long min = pushedBackWatermark.get();
     for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
-      pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
+      min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
       pushedBack.add(pushedBackValue);
     }
+    setPushedBackWatermark(min);
     pushbackDoFnRunner.finishBundle();
   }
 
@@ -346,15 +378,12 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     PCollectionView<?> sideInput = 
sideInputTagMapping.get(streamRecord.getValue().getUnionTag());
     sideInputHandler.addSideInputValue(sideInput, value);
 
-    ListState<WindowedValue<InputT>> pushedBack =
-        sideInputStateBackend.getPartitionedState(
-            null,
-            VoidSerializer.INSTANCE,
-            pushedBackDescriptor);
+    BagState<WindowedValue<InputT>> pushedBack =
+        pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
 
     List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
 
-    Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.get();
+    Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
     if (pushedBackContents != null) {
       for (WindowedValue<InputT> elem : pushedBackContents) {
 
@@ -368,19 +397,13 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       }
     }
 
-
-    ReducingState<Long> pushedBackWatermark =
-        sideInputStateBackend.getPartitionedState(
-            null,
-            VoidSerializer.INSTANCE,
-            pushedBackWatermarkDescriptor);
-
     pushedBack.clear();
-    pushedBackWatermark.clear();
+    long min = TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE);
     for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
-      pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
+      min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
       pushedBack.add(pushedBackValue);
     }
+    setPushedBackWatermark(min);
 
     pushbackDoFnRunner.finishBundle();
 
@@ -410,44 +433,116 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
   }
 
   @Override
-  public StreamTaskState snapshotOperatorState(
-      long checkpointId,
-      long timestamp) throws Exception {
+  public void snapshotState(StateSnapshotContext context) throws Exception {
+    // copy from AbstractStreamOperator
+    if (getKeyedStateBackend() != null) {
+      KeyedStateCheckpointOutputStream out;
+
+      try {
+        out = context.getRawKeyedOperatorStateOutput();
+      } catch (Exception exception) {
+        throw new Exception("Could not open raw keyed operator state stream 
for "
+            + getOperatorName() + '.', exception);
+      }
+
+      try {
+        KeyGroupsList allKeyGroups = out.getKeyGroupList();
+        for (int keyGroupIdx : allKeyGroups) {
+          out.startNewKeyGroup(keyGroupIdx);
 
-    StreamTaskState streamTaskState = 
super.snapshotOperatorState(checkpointId, timestamp);
+          DataOutputViewStreamWrapper dov = new 
DataOutputViewStreamWrapper(out);
 
-    if (sideInputStateBackend != null) {
-      // we have to manually checkpoint the side-input state backend and store
-      // the handle in the "user state" of the task state
-      HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> sideInputSnapshot =
-          sideInputStateBackend.snapshotPartitionedState(checkpointId, 
timestamp);
+          // if (this instanceof KeyGroupCheckpointedOperator)
+          snapshotKeyGroupState(keyGroupIdx, dov);
 
-      if (sideInputSnapshot != null) {
-        @SuppressWarnings("unchecked,rawtypes")
-        StateHandle<Serializable> sideInputStateHandle =
-            (StateHandle) sideInputStateBackend.checkpointStateSerializable(
-                sideInputSnapshot, checkpointId, timestamp);
+          // We can't get all timerServices, so we just snapshot our 
timerService
+          // Maybe this is a normal DoFn that has no timerService
+          if (keyCoder != null) {
+            timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx);
+          }
 
-        streamTaskState.setFunctionState(sideInputStateHandle);
+        }
+      } catch (Exception exception) {
+        throw new Exception("Could not write timer service of " + 
getOperatorName()
+            + " to checkpoint state stream.", exception);
+      } finally {
+        try {
+          out.close();
+        } catch (Exception closeException) {
+          LOG.warn("Could not close raw keyed operator state stream for {}. 
This "
+              + "might have prevented deleting some state data.", 
getOperatorName(),
+              closeException);
+        }
       }
     }
-
-    return streamTaskState;
   }
 
   @Override
-  public void restoreState(StreamTaskState state) throws Exception {
-    super.restoreState(state);
+  public void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) 
throws Exception {
+    if (!sideInputs.isEmpty() && keyCoder != null) {
+      ((FlinkKeyGroupStateInternals) 
pushbackStateInternals).snapshotKeyGroupState(
+          keyGroupIndex, out);
+    }
+  }
 
-    @SuppressWarnings("unchecked,rawtypes")
-    StateHandle<HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>>> 
sideInputStateHandle =
-        (StateHandle) state.getFunctionState();
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {
+    if (getKeyedStateBackend() != null) {
+      int totalKeyGroups = getKeyedStateBackend().getNumberOfKeyGroups();
+      KeyGroupsList localKeyGroupRange = 
getKeyedStateBackend().getKeyGroupRange();
+
+      for (KeyGroupStatePartitionStreamProvider streamProvider : 
context.getRawKeyedStateInputs()) {
+        DataInputViewStreamWrapper div = new 
DataInputViewStreamWrapper(streamProvider.getStream());
+
+        int keyGroupIdx = streamProvider.getKeyGroupId();
+        checkArgument(localKeyGroupRange.contains(keyGroupIdx),
+            "Key Group " + keyGroupIdx + " does not belong to the local 
range.");
+
+        // if (this instanceof KeyGroupRestoringOperator)
+        restoreKeyGroupState(keyGroupIdx, div);
+
+        // We just initialize our timerService
+        if (keyCoder != null) {
+          if (timerService == null) {
+            timerService = new HeapInternalTimerService<>(
+                totalKeyGroups,
+                localKeyGroupRange,
+                this,
+                getRuntimeContext().getProcessingTimeService());
+          }
+          timerService.restoreTimersForKeyGroup(div, keyGroupIdx, 
getUserCodeClassloader());
+        }
+      }
+    }
+  }
 
-    if (sideInputStateHandle != null) {
-      restoredSideInputState = 
sideInputStateHandle.getState(getUserCodeClassloader());
+  @Override
+  public void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) 
throws Exception {
+    if (!sideInputs.isEmpty() && keyCoder != null) {
+      if (pushbackStateInternals == null) {
+        pushbackStateInternals = new FlinkKeyGroupStateInternals<>(keyCoder,
+            getKeyedStateBackend());
+      }
+      ((FlinkKeyGroupStateInternals) pushbackStateInternals)
+          .restoreKeyGroupState(keyGroupIndex, in, getUserCodeClassloader());
     }
   }
 
+  @Override
+  public void onEventTime(InternalTimer<Object, TimerData> timer) throws 
Exception {
+    fireTimer(timer);
+  }
+
+  @Override
+  public void onProcessingTime(InternalTimer<Object, TimerData> timer) throws 
Exception {
+    fireTimer(timer);
+  }
+
+  public void fireTimer(InternalTimer<?, TimerData> timer) {
+    // Now not implement timers in StatefulPardo
+    throw new RuntimeException("The fireTimer should not be invoke in 
DoFnOperator.");
+  }
+
   /**
    * Factory for creating an {@link DoFnRunners.OutputManager} from
    * a Flink {@link Output}.
@@ -505,16 +600,6 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
   }
 
   /**
-   * For determining the pushback watermark in a {@link 
ReducingStateDescriptor}.
-   */
-  private static class LongMinReducer implements ReduceFunction<Long> {
-    @Override
-    public Long reduce(Long a, Long b) throws Exception {
-      return Math.min(a, b);
-    }
-  }
-
-  /**
    * {@link StepContext} for running {@link DoFn DoFns} on Flink. This does 
not allow
    * accessing state or timer internals.
    */
@@ -557,4 +642,77 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     }
   }
 
+  private class FlinkTimerInternals implements TimerInternals {
+
+    @Override
+    public void setTimer(
+        StateNamespace namespace, String timerId, Instant target, TimeDomain 
timeDomain) {
+      setTimer(TimerData.of(timerId, namespace, target, timeDomain));
+    }
+
+    @Deprecated
+    @Override
+    public void setTimer(TimerData timerKey) {
+      long time = timerKey.getTimestamp().getMillis();
+      if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
+        timerService.registerEventTimeTimer(timerKey, time);
+      } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
+        timerService.registerProcessingTimeTimer(timerKey, time);
+      } else {
+        throw new UnsupportedOperationException(
+            "Unsupported time domain: " + timerKey.getDomain());
+      }
+    }
+
+    @Deprecated
+    @Override
+    public void deleteTimer(StateNamespace namespace, String timerId) {
+      throw new UnsupportedOperationException(
+          "Canceling of a timer by ID is not yet supported.");
+    }
+
+    @Override
+    public void deleteTimer(StateNamespace namespace, String timerId, 
TimeDomain timeDomain) {
+      throw new UnsupportedOperationException(
+          "Canceling of a timer by ID is not yet supported.");
+    }
+
+    @Deprecated
+    @Override
+    public void deleteTimer(TimerData timerKey) {
+      long time = timerKey.getTimestamp().getMillis();
+      if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
+        timerService.deleteEventTimeTimer(timerKey, time);
+      } else if (timerKey.getDomain().equals(TimeDomain.PROCESSING_TIME)) {
+        timerService.deleteProcessingTimeTimer(timerKey, time);
+      } else {
+        throw new UnsupportedOperationException(
+            "Unsupported time domain: " + timerKey.getDomain());
+      }
+    }
+
+    @Override
+    public Instant currentProcessingTime() {
+      return new Instant(timerService.currentProcessingTime());
+    }
+
+    @Nullable
+    @Override
+    public Instant currentSynchronizedProcessingTime() {
+      return new Instant(timerService.currentProcessingTime());
+    }
+
+    @Override
+    public Instant currentInputWatermarkTime() {
+      return new Instant(Math.min(currentInputWatermark, 
getPushbackWatermarkHold()));
+    }
+
+    @Nullable
+    @Override
+    public Instant currentOutputWatermarkTime() {
+      return new Instant(currentOutputWatermark);
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/7d32b93e/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
deleted file mode 100644
index 4183067..0000000
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkStateInternals.java
+++ /dev/null
@@ -1,1055 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.wrappers.streaming;
-
-import com.google.common.collect.Lists;
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.beam.runners.core.StateInternals;
-import org.apache.beam.runners.core.StateNamespace;
-import org.apache.beam.runners.core.StateTag;
-import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
-import org.apache.beam.sdk.coders.InstantCoder;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.CombineWithContext;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.CombineContextFactory;
-import org.apache.beam.sdk.util.state.AccumulatorCombiningState;
-import org.apache.beam.sdk.util.state.BagState;
-import org.apache.beam.sdk.util.state.MapState;
-import org.apache.beam.sdk.util.state.ReadableState;
-import org.apache.beam.sdk.util.state.SetState;
-import org.apache.beam.sdk.util.state.State;
-import org.apache.beam.sdk.util.state.StateContext;
-import org.apache.beam.sdk.util.state.StateContexts;
-import org.apache.beam.sdk.util.state.ValueState;
-import org.apache.beam.sdk.util.state.WatermarkHoldState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.runtime.state.AbstractStateBackend;
-import org.joda.time.Instant;
-
-/**
- * {@link StateInternals} that uses a Flink {@link AbstractStateBackend} to
- * manage state.
- *
- * <p>Note: In the Flink streaming runner the key is always encoded
- * using an {@link Coder} and stored in a {@link ByteBuffer}.
- */
-public class FlinkStateInternals<K> implements StateInternals<K> {
-
-  private final Coder<K> keyCoder;
-
-  private final AbstractStateBackend flinkStateBackend;
-
-  // on recovery, these will no be properly set because we don't
-  // know which watermark hold states there are in the Flink State Backend
-  private final Map<String, Instant> watermarkHolds = new HashMap<>();
-
-  public FlinkStateInternals(AbstractStateBackend flinkStateBackend, Coder<K> 
keyCoder) {
-    this.flinkStateBackend = flinkStateBackend;
-    this.keyCoder = keyCoder;
-  }
-
-  /**
-   * Returns the minimum over all watermark holds.
-   */
-  public Instant watermarkHold() {
-    long min = Long.MAX_VALUE;
-    for (Instant hold: watermarkHolds.values()) {
-      min = Math.min(min, hold.getMillis());
-    }
-    return new Instant(min);
-  }
-
-  @Override
-  public K getKey() {
-    ByteBuffer keyBytes = (ByteBuffer) flinkStateBackend.getCurrentKey();
-    try {
-      return CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
-    } catch (CoderException e) {
-      throw new RuntimeException("Error decoding key.", e);
-    }
-  }
-
-  @Override
-  public <T extends State> T state(
-      final StateNamespace namespace,
-      StateTag<? super K, T> address) {
-
-    return state(namespace, address, StateContexts.nullContext());
-  }
-
-  @Override
-  public <T extends State> T state(
-      final StateNamespace namespace,
-      StateTag<? super K, T> address,
-      final StateContext<?> context) {
-
-    return address.bind(new StateTag.StateBinder<K>() {
-
-      @Override
-      public <T> ValueState<T> bindValue(
-          StateTag<? super K, ValueState<T>> address,
-          Coder<T> coder) {
-
-        return new FlinkValueState<>(flinkStateBackend, address, namespace, 
coder);
-      }
-
-      @Override
-      public <T> BagState<T> bindBag(
-          StateTag<? super K, BagState<T>> address,
-          Coder<T> elemCoder) {
-
-        return new FlinkBagState<>(flinkStateBackend, address, namespace, 
elemCoder);
-      }
-
-      @Override
-      public <T> SetState<T> bindSet(
-          StateTag<? super K, SetState<T>> address,
-          Coder<T> elemCoder) {
-        throw new UnsupportedOperationException(
-            String.format("%s is not supported", 
SetState.class.getSimpleName()));
-      }
-
-      @Override
-      public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
-          StateTag<? super K, MapState<KeyT, ValueT>> spec,
-          Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
-        throw new UnsupportedOperationException(
-            String.format("%s is not supported", 
MapState.class.getSimpleName()));
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT>
-      bindCombiningValue(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
-          Coder<AccumT> accumCoder,
-          Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
-
-        return new FlinkAccumulatorCombiningState<>(
-            flinkStateBackend, address, combineFn, namespace, accumCoder);
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValue(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
-          Coder<AccumT> accumCoder,
-          final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> 
combineFn) {
-        return new FlinkKeyedAccumulatorCombiningState<>(
-            flinkStateBackend,
-            address,
-            combineFn,
-            namespace,
-            accumCoder,
-            FlinkStateInternals.this);
-      }
-
-      @Override
-      public <InputT, AccumT, OutputT>
-          AccumulatorCombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
-          StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
-          Coder<AccumT> accumCoder,
-          CombineWithContext.KeyedCombineFnWithContext<
-              ? super K, InputT, AccumT, OutputT> combineFn) {
-        return new FlinkAccumulatorCombiningStateWithContext<>(
-            flinkStateBackend,
-            address,
-            combineFn,
-            namespace,
-            accumCoder,
-            FlinkStateInternals.this,
-            CombineContextFactory.createFromStateContext(context));
-      }
-
-      @Override
-      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
-          StateTag<? super K, WatermarkHoldState<W>> address,
-          OutputTimeFn<? super W> outputTimeFn) {
-
-        return new FlinkWatermarkHoldState<>(
-            flinkStateBackend, FlinkStateInternals.this, address, namespace, 
outputTimeFn);
-      }
-    });
-  }
-
-  private static class FlinkValueState<K, T> implements ValueState<T> {
-
-    private final StateNamespace namespace;
-    private final StateTag<? super K, ValueState<T>> address;
-    private final ValueStateDescriptor<T> flinkStateDescriptor;
-    private final AbstractStateBackend flinkStateBackend;
-
-    FlinkValueState(
-        AbstractStateBackend flinkStateBackend,
-        StateTag<? super K, ValueState<T>> address,
-        StateNamespace namespace,
-        Coder<T> coder) {
-
-      this.namespace = namespace;
-      this.address = address;
-      this.flinkStateBackend = flinkStateBackend;
-
-      CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder);
-
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), 
typeInfo, null);
-    }
-
-    @Override
-    public void write(T input) {
-      try {
-        flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).update(input);
-      } catch (Exception e) {
-        throw new RuntimeException("Error updating state.", e);
-      }
-    }
-
-    @Override
-    public ValueState<T> readLater() {
-      return this;
-    }
-
-    @Override
-    public T read() {
-      try {
-        return flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).value();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public void clear() {
-      try {
-        flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).clear();
-      } catch (Exception e) {
-        throw new RuntimeException("Error clearing state.", e);
-      }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkValueState<?, ?> that = (FlinkValueState<?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
-  private static class FlinkBagState<K, T> implements BagState<T> {
-
-    private final StateNamespace namespace;
-    private final StateTag<? super K, BagState<T>> address;
-    private final ListStateDescriptor<T> flinkStateDescriptor;
-    private final AbstractStateBackend flinkStateBackend;
-
-    FlinkBagState(
-        AbstractStateBackend flinkStateBackend,
-        StateTag<? super K, BagState<T>> address,
-        StateNamespace namespace,
-        Coder<T> coder) {
-
-      this.namespace = namespace;
-      this.address = address;
-      this.flinkStateBackend = flinkStateBackend;
-
-      CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder);
-
-      flinkStateDescriptor = new ListStateDescriptor<>(address.getId(), 
typeInfo);
-    }
-
-    @Override
-    public void add(T input) {
-      try {
-        flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).add(input);
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to bag state.", e);
-      }
-    }
-
-    @Override
-    public BagState<T> readLater() {
-      return this;
-    }
-
-    @Override
-    public Iterable<T> read() {
-      try {
-        Iterable<T> result = flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).get();
-
-        return result != null ? result : Collections.<T>emptyList();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            Iterable<T> result = flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor).get();
-            return result == null;
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public void clear() {
-      try {
-        flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).clear();
-      } catch (Exception e) {
-        throw new RuntimeException("Error clearing state.", e);
-      }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkBagState<?, ?> that = (FlinkBagState<?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
-  private static class FlinkAccumulatorCombiningState<K, InputT, AccumT, 
OutputT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
-
-    private final StateNamespace namespace;
-    private final StateTag<? super K, AccumulatorCombiningState<InputT, 
AccumT, OutputT>> address;
-    private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
-    private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
-    private final AbstractStateBackend flinkStateBackend;
-
-    FlinkAccumulatorCombiningState(
-        AbstractStateBackend flinkStateBackend,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
-        Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
-        StateNamespace namespace,
-        Coder<AccumT> accumCoder) {
-
-      this.namespace = namespace;
-      this.address = address;
-      this.combineFn = combineFn;
-      this.flinkStateBackend = flinkStateBackend;
-
-      CoderTypeInformation<AccumT> typeInfo = new 
CoderTypeInformation<>(accumCoder);
-
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), 
typeInfo, null);
-    }
-
-    @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
-      return this;
-    }
-
-    @Override
-    public void add(InputT value) {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT current = state.value();
-        if (current == null) {
-          current = combineFn.createAccumulator();
-        }
-        current = combineFn.addInput(current, value);
-        state.update(current);
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state." , e);
-      }
-    }
-
-    @Override
-    public void addAccum(AccumT accum) {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-              namespace.stringKey(),
-              StringSerializer.INSTANCE,
-              flinkStateDescriptor);
-
-        AccumT current = state.value();
-        if (current == null) {
-          state.update(accum);
-        } else {
-          current = combineFn.mergeAccumulators(Lists.newArrayList(current, 
accum));
-          state.update(current);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state.", e);
-      }
-    }
-
-    @Override
-    public AccumT getAccum() {
-      try {
-        return flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).value();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(accumulators);
-    }
-
-    @Override
-    public OutputT read() {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT accum = state.value();
-        if (accum != null) {
-          return combineFn.extractOutput(accum);
-        } else {
-          return combineFn.extractOutput(combineFn.createAccumulator());
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            return flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor).value() == null;
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public void clear() {
-      try {
-        flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).clear();
-      } catch (Exception e) {
-        throw new RuntimeException("Error clearing state.", e);
-      }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkAccumulatorCombiningState<?, ?, ?, ?> that =
-          (FlinkAccumulatorCombiningState<?, ?, ?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
-  private static class FlinkKeyedAccumulatorCombiningState<K, InputT, AccumT, 
OutputT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
-
-    private final StateNamespace namespace;
-    private final StateTag<? super K, AccumulatorCombiningState<InputT, 
AccumT, OutputT>> address;
-    private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> 
combineFn;
-    private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
-    private final AbstractStateBackend flinkStateBackend;
-    private final FlinkStateInternals<K> flinkStateInternals;
-
-    FlinkKeyedAccumulatorCombiningState(
-        AbstractStateBackend flinkStateBackend,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
-        Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
-        StateNamespace namespace,
-        Coder<AccumT> accumCoder,
-        FlinkStateInternals<K> flinkStateInternals) {
-
-      this.namespace = namespace;
-      this.address = address;
-      this.combineFn = combineFn;
-      this.flinkStateBackend = flinkStateBackend;
-      this.flinkStateInternals = flinkStateInternals;
-
-      CoderTypeInformation<AccumT> typeInfo = new 
CoderTypeInformation<>(accumCoder);
-
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), 
typeInfo, null);
-    }
-
-    @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
-      return this;
-    }
-
-    @Override
-    public void add(InputT value) {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT current = state.value();
-        if (current == null) {
-          current = combineFn.createAccumulator(flinkStateInternals.getKey());
-        }
-        current = combineFn.addInput(flinkStateInternals.getKey(), current, 
value);
-        state.update(current);
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state." , e);
-      }
-    }
-
-    @Override
-    public void addAccum(AccumT accum) {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT current = state.value();
-        if (current == null) {
-          state.update(accum);
-        } else {
-          current = combineFn.mergeAccumulators(
-              flinkStateInternals.getKey(),
-              Lists.newArrayList(current, accum));
-          state.update(current);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state.", e);
-      }
-    }
-
-    @Override
-    public AccumT getAccum() {
-      try {
-        return flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).value();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(flinkStateInternals.getKey(), 
accumulators);
-    }
-
-    @Override
-    public OutputT read() {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT accum = state.value();
-        if (accum != null) {
-          return combineFn.extractOutput(flinkStateInternals.getKey(), accum);
-        } else {
-          return combineFn.extractOutput(
-              flinkStateInternals.getKey(),
-              combineFn.createAccumulator(flinkStateInternals.getKey()));
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            return flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor).value() == null;
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public void clear() {
-      try {
-        flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).clear();
-      } catch (Exception e) {
-        throw new RuntimeException("Error clearing state.", e);
-      }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?> that =
-          (FlinkKeyedAccumulatorCombiningState<?, ?, ?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
-  private static class FlinkAccumulatorCombiningStateWithContext<K, InputT, 
AccumT, OutputT>
-      implements AccumulatorCombiningState<InputT, AccumT, OutputT> {
-
-    private final StateNamespace namespace;
-    private final StateTag<? super K, AccumulatorCombiningState<InputT, 
AccumT, OutputT>> address;
-    private final CombineWithContext.KeyedCombineFnWithContext<
-        ? super K, InputT, AccumT, OutputT> combineFn;
-    private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
-    private final AbstractStateBackend flinkStateBackend;
-    private final FlinkStateInternals<K> flinkStateInternals;
-    private final CombineWithContext.Context context;
-
-    FlinkAccumulatorCombiningStateWithContext(
-        AbstractStateBackend flinkStateBackend,
-        StateTag<? super K, AccumulatorCombiningState<InputT, AccumT, 
OutputT>> address,
-        CombineWithContext.KeyedCombineFnWithContext<
-            ? super K, InputT, AccumT, OutputT> combineFn,
-        StateNamespace namespace,
-        Coder<AccumT> accumCoder,
-        FlinkStateInternals<K> flinkStateInternals,
-        CombineWithContext.Context context) {
-
-      this.namespace = namespace;
-      this.address = address;
-      this.combineFn = combineFn;
-      this.flinkStateBackend = flinkStateBackend;
-      this.flinkStateInternals = flinkStateInternals;
-      this.context = context;
-
-      CoderTypeInformation<AccumT> typeInfo = new 
CoderTypeInformation<>(accumCoder);
-
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), 
typeInfo, null);
-    }
-
-    @Override
-    public AccumulatorCombiningState<InputT, AccumT, OutputT> readLater() {
-      return this;
-    }
-
-    @Override
-    public void add(InputT value) {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT current = state.value();
-        if (current == null) {
-          current = combineFn.createAccumulator(flinkStateInternals.getKey(), 
context);
-        }
-        current = combineFn.addInput(flinkStateInternals.getKey(), current, 
value, context);
-        state.update(current);
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state." , e);
-      }
-    }
-
-    @Override
-    public void addAccum(AccumT accum) {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT current = state.value();
-        if (current == null) {
-          state.update(accum);
-        } else {
-          current = combineFn.mergeAccumulators(
-              flinkStateInternals.getKey(),
-              Lists.newArrayList(current, accum),
-              context);
-          state.update(current);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error adding to state.", e);
-      }
-    }
-
-    @Override
-    public AccumT getAccum() {
-      try {
-        return flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).value();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
-      return combineFn.mergeAccumulators(flinkStateInternals.getKey(), 
accumulators, context);
-    }
-
-    @Override
-    public OutputT read() {
-      try {
-        org.apache.flink.api.common.state.ValueState<AccumT> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-
-        AccumT accum = state.value();
-        return combineFn.extractOutput(flinkStateInternals.getKey(), accum, 
context);
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            return flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor).value() == null;
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-    }
-
-    @Override
-    public void clear() {
-      try {
-        flinkStateBackend.getPartitionedState(
-            namespace.stringKey(),
-            StringSerializer.INSTANCE,
-            flinkStateDescriptor).clear();
-      } catch (Exception e) {
-        throw new RuntimeException("Error clearing state.", e);
-      }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?> that =
-          (FlinkAccumulatorCombiningStateWithContext<?, ?, ?, ?>) o;
-
-      return namespace.equals(that.namespace) && address.equals(that.address);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = namespace.hashCode();
-      result = 31 * result + address.hashCode();
-      return result;
-    }
-  }
-
-  private static class FlinkWatermarkHoldState<K, W extends BoundedWindow>
-      implements WatermarkHoldState<W> {
-    private final StateTag<? super K, WatermarkHoldState<W>> address;
-    private final OutputTimeFn<? super W> outputTimeFn;
-    private final StateNamespace namespace;
-    private final AbstractStateBackend flinkStateBackend;
-    private final FlinkStateInternals<K> flinkStateInternals;
-    private final ValueStateDescriptor<Instant> flinkStateDescriptor;
-
-    public FlinkWatermarkHoldState(
-        AbstractStateBackend flinkStateBackend,
-        FlinkStateInternals<K> flinkStateInternals,
-        StateTag<? super K, WatermarkHoldState<W>> address,
-        StateNamespace namespace,
-        OutputTimeFn<? super W> outputTimeFn) {
-      this.address = address;
-      this.outputTimeFn = outputTimeFn;
-      this.namespace = namespace;
-      this.flinkStateBackend = flinkStateBackend;
-      this.flinkStateInternals = flinkStateInternals;
-
-      CoderTypeInformation<Instant> typeInfo = new 
CoderTypeInformation<>(InstantCoder.of());
-      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), 
typeInfo, null);
-    }
-
-    @Override
-    public OutputTimeFn<? super W> getOutputTimeFn() {
-      return outputTimeFn;
-    }
-
-    @Override
-    public WatermarkHoldState<W> readLater() {
-      return this;
-    }
-
-    @Override
-    public ReadableState<Boolean> isEmpty() {
-      return new ReadableState<Boolean>() {
-        @Override
-        public Boolean read() {
-          try {
-            return flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor).value() == null;
-          } catch (Exception e) {
-            throw new RuntimeException("Error reading state.", e);
-          }
-        }
-
-        @Override
-        public ReadableState<Boolean> readLater() {
-          return this;
-        }
-      };
-
-    }
-
-    @Override
-    public void add(Instant value) {
-      try {
-        org.apache.flink.api.common.state.ValueState<Instant> state =
-            flinkStateBackend.getPartitionedState(
-              namespace.stringKey(),
-              StringSerializer.INSTANCE,
-              flinkStateDescriptor);
-
-        Instant current = state.value();
-        if (current == null) {
-          state.update(value);
-          flinkStateInternals.watermarkHolds.put(namespace.stringKey(), value);
-        } else {
-          Instant combined = outputTimeFn.combine(current, value);
-          state.update(combined);
-          flinkStateInternals.watermarkHolds.put(namespace.stringKey(), 
combined);
-        }
-      } catch (Exception e) {
-        throw new RuntimeException("Error updating state.", e);
-      }
-    }
-
-    @Override
-    public Instant read() {
-      try {
-        org.apache.flink.api.common.state.ValueState<Instant> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-        return state.value();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public void clear() {
-      flinkStateInternals.watermarkHolds.remove(namespace.stringKey());
-      try {
-        org.apache.flink.api.common.state.ValueState<Instant> state =
-            flinkStateBackend.getPartitionedState(
-                namespace.stringKey(),
-                StringSerializer.INSTANCE,
-                flinkStateDescriptor);
-        state.clear();
-      } catch (Exception e) {
-        throw new RuntimeException("Error reading state.", e);
-      }
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      FlinkWatermarkHoldState<?, ?> that = (FlinkWatermarkHoldState<?, ?>) o;
-
-      if (!address.equals(that.address)) {
-        return false;
-      }
-      if (!outputTimeFn.equals(that.outputTimeFn)) {
-        return false;
-      }
-      return namespace.equals(that.namespace);
-
-    }
-
-    @Override
-    public int hashCode() {
-      int result = address.hashCode();
-      result = 31 * result + outputTimeFn.hashCode();
-      result = 31 * result + namespace.hashCode();
-      return result;
-    }
-  }
-}

Reply via email to