http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
new file mode 100644
index 0000000..2bf0bf1
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkSplitStateInternals.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
+
+import com.google.common.collect.Iterators;
+import java.util.Collections;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.MapState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.SetState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateContext;
+import org.apache.beam.sdk.util.state.StateContexts;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.state.OperatorStateBackend;
+
+/**
+ * {@link StateInternals} that uses a Flink {@link OperatorStateBackend}
+ * to manage the split-distribute state.
+ *
+ * <p>Elements in ListState will be redistributed in round robin fashion
+ * to operators when restarting with a different parallelism.
+ *
+ *  <p>Note:
+ *  Ignore index of key and namespace.
+ *  Just implement BagState.
+ */
+public class FlinkSplitStateInternals<K> implements StateInternals<K> {
+
+  private final OperatorStateBackend stateBackend;
+
+  public FlinkSplitStateInternals(OperatorStateBackend stateBackend) {
+    this.stateBackend = stateBackend;
+  }
+
+  @Override
+  public K getKey() {
+    return null;
+  }
+
+  @Override
+  public <T extends State> T state(
+      final StateNamespace namespace,
+      StateTag<? super K, T> address) {
+
+    return state(namespace, address, StateContexts.nullContext());
+  }
+
+  @Override
+  public <T extends State> T state(
+      final StateNamespace namespace,
+      StateTag<? super K, T> address,
+      final StateContext<?> context) {
+
+    return address.bind(new StateTag.StateBinder<K>() {
+
+      @Override
+      public <T> ValueState<T> bindValue(
+          StateTag<? super K, ValueState<T>> address,
+          Coder<T> coder) {
+        throw new UnsupportedOperationException(
+            String.format("%s is not supported", 
ValueState.class.getSimpleName()));
+      }
+
+      @Override
+      public <T> BagState<T> bindBag(
+          StateTag<? super K, BagState<T>> address,
+          Coder<T> elemCoder) {
+
+        return new FlinkSplitBagState<>(stateBackend, address, namespace, 
elemCoder);
+      }
+
+      @Override
+      public <T> SetState<T> bindSet(
+          StateTag<? super K, SetState<T>> address,
+          Coder<T> elemCoder) {
+        throw new UnsupportedOperationException(
+            String.format("%s is not supported", 
SetState.class.getSimpleName()));
+      }
+
+      @Override
+      public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+          StateTag<? super K, MapState<KeyT, ValueT>> spec,
+          Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
+        throw new UnsupportedOperationException(
+            String.format("%s is not supported", 
MapState.class.getSimpleName()));
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+      CombiningState<InputT, AccumT, OutputT>
+      bindCombiningValue(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+          Coder<AccumT> accumCoder,
+          Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+        throw new UnsupportedOperationException("bindCombiningValue is not 
supported.");
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+          Coder<AccumT> accumCoder,
+          final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> 
combineFn) {
+        throw new UnsupportedOperationException("bindKeyedCombiningValue is 
not supported.");
+
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+      CombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+          Coder<AccumT> accumCoder,
+          CombineWithContext.KeyedCombineFnWithContext<
+              ? super K, InputT, AccumT, OutputT> combineFn) {
+        throw new UnsupportedOperationException(
+            "bindKeyedCombiningValueWithContext is not supported.");
+      }
+
+      @Override
+      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+          StateTag<? super K, WatermarkHoldState<W>> address,
+          OutputTimeFn<? super W> outputTimeFn) {
+        throw new UnsupportedOperationException(
+            String.format("%s is not supported", 
CombiningState.class.getSimpleName()));
+      }
+    });
+  }
+
+  private static class FlinkSplitBagState<K, T> implements BagState<T> {
+
+    private final ListStateDescriptor<T> descriptor;
+    private OperatorStateBackend flinkStateBackend;
+    private final StateNamespace namespace;
+    private final StateTag<? super K, BagState<T>> address;
+
+    FlinkSplitBagState(
+        OperatorStateBackend flinkStateBackend,
+        StateTag<? super K, BagState<T>> address,
+        StateNamespace namespace,
+        Coder<T> coder) {
+      this.flinkStateBackend = flinkStateBackend;
+      this.namespace = namespace;
+      this.address = address;
+
+      CoderTypeInformation<T> typeInfo =
+          new CoderTypeInformation<>(coder);
+
+      descriptor = new ListStateDescriptor<>(address.getId(),
+          typeInfo.createSerializer(new ExecutionConfig()));
+    }
+
+    @Override
+    public void add(T input) {
+      try {
+        flinkStateBackend.getOperatorState(descriptor).add(input);
+      } catch (Exception e) {
+        throw new RuntimeException("Error updating state.", e);
+      }
+    }
+
+    @Override
+    public BagState<T> readLater() {
+      return this;
+    }
+
+    @Override
+    public Iterable<T> read() {
+      try {
+        Iterable<T> result = 
flinkStateBackend.getOperatorState(descriptor).get();
+        return result != null ? result : Collections.<T>emptyList();
+      } catch (Exception e) {
+        throw new RuntimeException("Error updating state.", e);
+      }
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public Boolean read() {
+          try {
+            Iterable<T> result = 
flinkStateBackend.getOperatorState(descriptor).get();
+            // PartitionableListState.get() return empty collection When there 
is no element,
+            // KeyedListState different. (return null)
+            return result == null || Iterators.size(result.iterator()) == 0;
+          } catch (Exception e) {
+            throw new RuntimeException("Error reading state.", e);
+          }
+
+        }
+
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+      };
+    }
+
+    @Override
+    public void clear() {
+      try {
+        flinkStateBackend.getOperatorState(descriptor).clear();
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      FlinkSplitBagState<?, ?> that = (FlinkSplitBagState<?, ?>) o;
+
+      return namespace.equals(that.namespace) && address.equals(that.address);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
new file mode 100644
index 0000000..4f961e5
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java
@@ -0,0 +1,1053 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
+
+import com.google.common.collect.Lists;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.beam.runners.core.StateInternals;
+import org.apache.beam.runners.core.StateNamespace;
+import org.apache.beam.runners.core.StateTag;
+import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.CombineWithContext;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.beam.sdk.util.CombineContextFactory;
+import org.apache.beam.sdk.util.state.BagState;
+import org.apache.beam.sdk.util.state.CombiningState;
+import org.apache.beam.sdk.util.state.MapState;
+import org.apache.beam.sdk.util.state.ReadableState;
+import org.apache.beam.sdk.util.state.SetState;
+import org.apache.beam.sdk.util.state.State;
+import org.apache.beam.sdk.util.state.StateContext;
+import org.apache.beam.sdk.util.state.StateContexts;
+import org.apache.beam.sdk.util.state.ValueState;
+import org.apache.beam.sdk.util.state.WatermarkHoldState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.state.KeyedStateBackend;
+import org.joda.time.Instant;
+
+/**
+ * {@link StateInternals} that uses a Flink {@link KeyedStateBackend} to 
manage state.
+ *
+ * <p>Note: In the Flink streaming runner the key is always encoded
+ * using an {@link Coder} and stored in a {@link ByteBuffer}.
+ */
+public class FlinkStateInternals<K> implements StateInternals<K> {
+
+  private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+  private Coder<K> keyCoder;
+
+  // on recovery, these will no be properly set because we don't
+  // know which watermark hold states there are in the Flink State Backend
+  private final Map<String, Instant> watermarkHolds = new HashMap<>();
+
+  public FlinkStateInternals(KeyedStateBackend<ByteBuffer> flinkStateBackend, 
Coder<K> keyCoder) {
+    this.flinkStateBackend = flinkStateBackend;
+    this.keyCoder = keyCoder;
+  }
+
+  /**
+   * Returns the minimum over all watermark holds.
+   */
+  public Instant watermarkHold() {
+    long min = Long.MAX_VALUE;
+    for (Instant hold: watermarkHolds.values()) {
+      min = Math.min(min, hold.getMillis());
+    }
+    return new Instant(min);
+  }
+
+  @Override
+  public K getKey() {
+    ByteBuffer keyBytes = flinkStateBackend.getCurrentKey();
+    try {
+      return CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
+    } catch (CoderException e) {
+      throw new RuntimeException("Error decoding key.", e);
+    }
+  }
+
+  @Override
+  public <T extends State> T state(
+      final StateNamespace namespace,
+      StateTag<? super K, T> address) {
+
+    return state(namespace, address, StateContexts.nullContext());
+  }
+
+  @Override
+  public <T extends State> T state(
+      final StateNamespace namespace,
+      StateTag<? super K, T> address,
+      final StateContext<?> context) {
+
+    return address.bind(new StateTag.StateBinder<K>() {
+
+      @Override
+      public <T> ValueState<T> bindValue(
+          StateTag<? super K, ValueState<T>> address,
+          Coder<T> coder) {
+
+        return new FlinkValueState<>(flinkStateBackend, address, namespace, 
coder);
+      }
+
+      @Override
+      public <T> BagState<T> bindBag(
+          StateTag<? super K, BagState<T>> address,
+          Coder<T> elemCoder) {
+
+        return new FlinkBagState<>(flinkStateBackend, address, namespace, 
elemCoder);
+      }
+
+      @Override
+      public <T> SetState<T> bindSet(
+          StateTag<? super K, SetState<T>> address,
+          Coder<T> elemCoder) {
+        throw new UnsupportedOperationException(
+            String.format("%s is not supported", 
SetState.class.getSimpleName()));
+      }
+
+      @Override
+      public <KeyT, ValueT> MapState<KeyT, ValueT> bindMap(
+          StateTag<? super K, MapState<KeyT, ValueT>> spec,
+          Coder<KeyT> mapKeyCoder, Coder<ValueT> mapValueCoder) {
+        throw new UnsupportedOperationException(
+            String.format("%s is not supported", 
MapState.class.getSimpleName()));
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+      CombiningState<InputT, AccumT, OutputT>
+      bindCombiningValue(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+          Coder<AccumT> accumCoder,
+          Combine.CombineFn<InputT, AccumT, OutputT> combineFn) {
+
+        return new FlinkCombiningState<>(
+            flinkStateBackend, address, combineFn, namespace, accumCoder);
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+      CombiningState<InputT, AccumT, OutputT> bindKeyedCombiningValue(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+          Coder<AccumT> accumCoder,
+          final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> 
combineFn) {
+        return new FlinkKeyedCombiningState<>(
+            flinkStateBackend,
+            address,
+            combineFn,
+            namespace,
+            accumCoder,
+            FlinkStateInternals.this);
+      }
+
+      @Override
+      public <InputT, AccumT, OutputT>
+      CombiningState<InputT, AccumT, OutputT> 
bindKeyedCombiningValueWithContext(
+          StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+          Coder<AccumT> accumCoder,
+          CombineWithContext.KeyedCombineFnWithContext<
+              ? super K, InputT, AccumT, OutputT> combineFn) {
+        return new FlinkCombiningStateWithContext<>(
+            flinkStateBackend,
+            address,
+            combineFn,
+            namespace,
+            accumCoder,
+            FlinkStateInternals.this,
+            CombineContextFactory.createFromStateContext(context));
+      }
+
+      @Override
+      public <W extends BoundedWindow> WatermarkHoldState<W> bindWatermark(
+          StateTag<? super K, WatermarkHoldState<W>> address,
+          OutputTimeFn<? super W> outputTimeFn) {
+
+        return new FlinkWatermarkHoldState<>(
+            flinkStateBackend, FlinkStateInternals.this, address, namespace, 
outputTimeFn);
+      }
+    });
+  }
+
+  private static class FlinkValueState<K, T> implements ValueState<T> {
+
+    private final StateNamespace namespace;
+    private final StateTag<? super K, ValueState<T>> address;
+    private final ValueStateDescriptor<T> flinkStateDescriptor;
+    private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+
+    FlinkValueState(
+        KeyedStateBackend<ByteBuffer> flinkStateBackend,
+        StateTag<? super K, ValueState<T>> address,
+        StateNamespace namespace,
+        Coder<T> coder) {
+
+      this.namespace = namespace;
+      this.address = address;
+      this.flinkStateBackend = flinkStateBackend;
+
+      CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder);
+
+      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), 
typeInfo, null);
+    }
+
+    @Override
+    public void write(T input) {
+      try {
+        flinkStateBackend.getPartitionedState(
+            namespace.stringKey(),
+            StringSerializer.INSTANCE,
+            flinkStateDescriptor).update(input);
+      } catch (Exception e) {
+        throw new RuntimeException("Error updating state.", e);
+      }
+    }
+
+    @Override
+    public ValueState<T> readLater() {
+      return this;
+    }
+
+    @Override
+    public T read() {
+      try {
+        return flinkStateBackend.getPartitionedState(
+            namespace.stringKey(),
+            StringSerializer.INSTANCE,
+            flinkStateDescriptor).value();
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    @Override
+    public void clear() {
+      try {
+        flinkStateBackend.getPartitionedState(
+            namespace.stringKey(),
+            StringSerializer.INSTANCE,
+            flinkStateDescriptor).clear();
+      } catch (Exception e) {
+        throw new RuntimeException("Error clearing state.", e);
+      }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      FlinkValueState<?, ?> that = (FlinkValueState<?, ?>) o;
+
+      return namespace.equals(that.namespace) && address.equals(that.address);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+  }
+
+  private static class FlinkBagState<K, T> implements BagState<T> {
+
+    private final StateNamespace namespace;
+    private final StateTag<? super K, BagState<T>> address;
+    private final ListStateDescriptor<T> flinkStateDescriptor;
+    private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+
+    FlinkBagState(
+        KeyedStateBackend<ByteBuffer> flinkStateBackend,
+        StateTag<? super K, BagState<T>> address,
+        StateNamespace namespace,
+        Coder<T> coder) {
+
+      this.namespace = namespace;
+      this.address = address;
+      this.flinkStateBackend = flinkStateBackend;
+
+      CoderTypeInformation<T> typeInfo = new CoderTypeInformation<>(coder);
+
+      flinkStateDescriptor = new ListStateDescriptor<>(address.getId(), 
typeInfo);
+    }
+
+    @Override
+    public void add(T input) {
+      try {
+        flinkStateBackend.getPartitionedState(
+            namespace.stringKey(),
+            StringSerializer.INSTANCE,
+            flinkStateDescriptor).add(input);
+      } catch (Exception e) {
+        throw new RuntimeException("Error adding to bag state.", e);
+      }
+    }
+
+    @Override
+    public BagState<T> readLater() {
+      return this;
+    }
+
+    @Override
+    public Iterable<T> read() {
+      try {
+        Iterable<T> result = flinkStateBackend.getPartitionedState(
+            namespace.stringKey(),
+            StringSerializer.INSTANCE,
+            flinkStateDescriptor).get();
+
+        return result != null ? result : Collections.<T>emptyList();
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public Boolean read() {
+          try {
+            Iterable<T> result = flinkStateBackend.getPartitionedState(
+                namespace.stringKey(),
+                StringSerializer.INSTANCE,
+                flinkStateDescriptor).get();
+            return result == null;
+          } catch (Exception e) {
+            throw new RuntimeException("Error reading state.", e);
+          }
+
+        }
+
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+      };
+    }
+
+    @Override
+    public void clear() {
+      try {
+        flinkStateBackend.getPartitionedState(
+            namespace.stringKey(),
+            StringSerializer.INSTANCE,
+            flinkStateDescriptor).clear();
+      } catch (Exception e) {
+        throw new RuntimeException("Error clearing state.", e);
+      }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      FlinkBagState<?, ?> that = (FlinkBagState<?, ?>) o;
+
+      return namespace.equals(that.namespace) && address.equals(that.address);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+  }
+
+  private static class FlinkCombiningState<K, InputT, AccumT, OutputT>
+      implements CombiningState<InputT, AccumT, OutputT> {
+
+    private final StateNamespace namespace;
+    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address;
+    private final Combine.CombineFn<InputT, AccumT, OutputT> combineFn;
+    private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
+    private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+
+    FlinkCombiningState(
+        KeyedStateBackend<ByteBuffer> flinkStateBackend,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        Combine.CombineFn<InputT, AccumT, OutputT> combineFn,
+        StateNamespace namespace,
+        Coder<AccumT> accumCoder) {
+
+      this.namespace = namespace;
+      this.address = address;
+      this.combineFn = combineFn;
+      this.flinkStateBackend = flinkStateBackend;
+
+      CoderTypeInformation<AccumT> typeInfo = new 
CoderTypeInformation<>(accumCoder);
+
+      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), 
typeInfo, null);
+    }
+
+    @Override
+    public CombiningState<InputT, AccumT, OutputT> readLater() {
+      return this;
+    }
+
+    @Override
+    public void add(InputT value) {
+      try {
+        org.apache.flink.api.common.state.ValueState<AccumT> state =
+            flinkStateBackend.getPartitionedState(
+                namespace.stringKey(),
+                StringSerializer.INSTANCE,
+                flinkStateDescriptor);
+
+        AccumT current = state.value();
+        if (current == null) {
+          current = combineFn.createAccumulator();
+        }
+        current = combineFn.addInput(current, value);
+        state.update(current);
+      } catch (Exception e) {
+        throw new RuntimeException("Error adding to state." , e);
+      }
+    }
+
+    @Override
+    public void addAccum(AccumT accum) {
+      try {
+        org.apache.flink.api.common.state.ValueState<AccumT> state =
+            flinkStateBackend.getPartitionedState(
+              namespace.stringKey(),
+              StringSerializer.INSTANCE,
+              flinkStateDescriptor);
+
+        AccumT current = state.value();
+        if (current == null) {
+          state.update(accum);
+        } else {
+          current = combineFn.mergeAccumulators(Lists.newArrayList(current, 
accum));
+          state.update(current);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Error adding to state.", e);
+      }
+    }
+
+    @Override
+    public AccumT getAccum() {
+      try {
+        return flinkStateBackend.getPartitionedState(
+            namespace.stringKey(),
+            StringSerializer.INSTANCE,
+            flinkStateDescriptor).value();
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return combineFn.mergeAccumulators(accumulators);
+    }
+
+    @Override
+    public OutputT read() {
+      try {
+        org.apache.flink.api.common.state.ValueState<AccumT> state =
+            flinkStateBackend.getPartitionedState(
+                namespace.stringKey(),
+                StringSerializer.INSTANCE,
+                flinkStateDescriptor);
+
+        AccumT accum = state.value();
+        if (accum != null) {
+          return combineFn.extractOutput(accum);
+        } else {
+          return combineFn.extractOutput(combineFn.createAccumulator());
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public Boolean read() {
+          try {
+            return flinkStateBackend.getPartitionedState(
+                namespace.stringKey(),
+                StringSerializer.INSTANCE,
+                flinkStateDescriptor).value() == null;
+          } catch (Exception e) {
+            throw new RuntimeException("Error reading state.", e);
+          }
+
+        }
+
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+      };
+    }
+
+    @Override
+    public void clear() {
+      try {
+        flinkStateBackend.getPartitionedState(
+            namespace.stringKey(),
+            StringSerializer.INSTANCE,
+            flinkStateDescriptor).clear();
+      } catch (Exception e) {
+        throw new RuntimeException("Error clearing state.", e);
+      }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      FlinkCombiningState<?, ?, ?, ?> that =
+          (FlinkCombiningState<?, ?, ?, ?>) o;
+
+      return namespace.equals(that.namespace) && address.equals(that.address);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+  }
+
+  private static class FlinkKeyedCombiningState<K, InputT, AccumT, OutputT>
+      implements CombiningState<InputT, AccumT, OutputT> {
+
+    private final StateNamespace namespace;
+    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address;
+    private final Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> 
combineFn;
+    private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
+    private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+    private final FlinkStateInternals<K> flinkStateInternals;
+
+    FlinkKeyedCombiningState(
+        KeyedStateBackend<ByteBuffer> flinkStateBackend,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        Combine.KeyedCombineFn<? super K, InputT, AccumT, OutputT> combineFn,
+        StateNamespace namespace,
+        Coder<AccumT> accumCoder,
+        FlinkStateInternals<K> flinkStateInternals) {
+
+      this.namespace = namespace;
+      this.address = address;
+      this.combineFn = combineFn;
+      this.flinkStateBackend = flinkStateBackend;
+      this.flinkStateInternals = flinkStateInternals;
+
+      CoderTypeInformation<AccumT> typeInfo = new 
CoderTypeInformation<>(accumCoder);
+
+      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), 
typeInfo, null);
+    }
+
+    @Override
+    public CombiningState<InputT, AccumT, OutputT> readLater() {
+      return this;
+    }
+
+    @Override
+    public void add(InputT value) {
+      try {
+        org.apache.flink.api.common.state.ValueState<AccumT> state =
+            flinkStateBackend.getPartitionedState(
+                namespace.stringKey(),
+                StringSerializer.INSTANCE,
+                flinkStateDescriptor);
+
+        AccumT current = state.value();
+        if (current == null) {
+          current = combineFn.createAccumulator(flinkStateInternals.getKey());
+        }
+        current = combineFn.addInput(flinkStateInternals.getKey(), current, 
value);
+        state.update(current);
+      } catch (Exception e) {
+        throw new RuntimeException("Error adding to state." , e);
+      }
+    }
+
+    @Override
+    public void addAccum(AccumT accum) {
+      try {
+        org.apache.flink.api.common.state.ValueState<AccumT> state =
+            flinkStateBackend.getPartitionedState(
+                namespace.stringKey(),
+                StringSerializer.INSTANCE,
+                flinkStateDescriptor);
+
+        AccumT current = state.value();
+        if (current == null) {
+          state.update(accum);
+        } else {
+          current = combineFn.mergeAccumulators(
+              flinkStateInternals.getKey(),
+              Lists.newArrayList(current, accum));
+          state.update(current);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Error adding to state.", e);
+      }
+    }
+
+    @Override
+    public AccumT getAccum() {
+      try {
+        return flinkStateBackend.getPartitionedState(
+            namespace.stringKey(),
+            StringSerializer.INSTANCE,
+            flinkStateDescriptor).value();
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return combineFn.mergeAccumulators(flinkStateInternals.getKey(), 
accumulators);
+    }
+
+    @Override
+    public OutputT read() {
+      try {
+        org.apache.flink.api.common.state.ValueState<AccumT> state =
+            flinkStateBackend.getPartitionedState(
+                namespace.stringKey(),
+                StringSerializer.INSTANCE,
+                flinkStateDescriptor);
+
+        AccumT accum = state.value();
+        if (accum != null) {
+          return combineFn.extractOutput(flinkStateInternals.getKey(), accum);
+        } else {
+          return combineFn.extractOutput(
+              flinkStateInternals.getKey(),
+              combineFn.createAccumulator(flinkStateInternals.getKey()));
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public Boolean read() {
+          try {
+            return flinkStateBackend.getPartitionedState(
+                namespace.stringKey(),
+                StringSerializer.INSTANCE,
+                flinkStateDescriptor).value() == null;
+          } catch (Exception e) {
+            throw new RuntimeException("Error reading state.", e);
+          }
+
+        }
+
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+      };
+    }
+
+    @Override
+    public void clear() {
+      try {
+        flinkStateBackend.getPartitionedState(
+            namespace.stringKey(),
+            StringSerializer.INSTANCE,
+            flinkStateDescriptor).clear();
+      } catch (Exception e) {
+        throw new RuntimeException("Error clearing state.", e);
+      }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      FlinkKeyedCombiningState<?, ?, ?, ?> that =
+          (FlinkKeyedCombiningState<?, ?, ?, ?>) o;
+
+      return namespace.equals(that.namespace) && address.equals(that.address);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+  }
+
+  private static class FlinkCombiningStateWithContext<K, InputT, AccumT, 
OutputT>
+      implements CombiningState<InputT, AccumT, OutputT> {
+
+    private final StateNamespace namespace;
+    private final StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> 
address;
+    private final CombineWithContext.KeyedCombineFnWithContext<
+        ? super K, InputT, AccumT, OutputT> combineFn;
+    private final ValueStateDescriptor<AccumT> flinkStateDescriptor;
+    private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+    private final FlinkStateInternals<K> flinkStateInternals;
+    private final CombineWithContext.Context context;
+
+    FlinkCombiningStateWithContext(
+        KeyedStateBackend<ByteBuffer> flinkStateBackend,
+        StateTag<? super K, CombiningState<InputT, AccumT, OutputT>> address,
+        CombineWithContext.KeyedCombineFnWithContext<
+            ? super K, InputT, AccumT, OutputT> combineFn,
+        StateNamespace namespace,
+        Coder<AccumT> accumCoder,
+        FlinkStateInternals<K> flinkStateInternals,
+        CombineWithContext.Context context) {
+
+      this.namespace = namespace;
+      this.address = address;
+      this.combineFn = combineFn;
+      this.flinkStateBackend = flinkStateBackend;
+      this.flinkStateInternals = flinkStateInternals;
+      this.context = context;
+
+      CoderTypeInformation<AccumT> typeInfo = new 
CoderTypeInformation<>(accumCoder);
+
+      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), 
typeInfo, null);
+    }
+
+    @Override
+    public CombiningState<InputT, AccumT, OutputT> readLater() {
+      return this;
+    }
+
+    @Override
+    public void add(InputT value) {
+      try {
+        org.apache.flink.api.common.state.ValueState<AccumT> state =
+            flinkStateBackend.getPartitionedState(
+                namespace.stringKey(),
+                StringSerializer.INSTANCE,
+                flinkStateDescriptor);
+
+        AccumT current = state.value();
+        if (current == null) {
+          current = combineFn.createAccumulator(flinkStateInternals.getKey(), 
context);
+        }
+        current = combineFn.addInput(flinkStateInternals.getKey(), current, 
value, context);
+        state.update(current);
+      } catch (Exception e) {
+        throw new RuntimeException("Error adding to state." , e);
+      }
+    }
+
+    @Override
+    public void addAccum(AccumT accum) {
+      try {
+        org.apache.flink.api.common.state.ValueState<AccumT> state =
+            flinkStateBackend.getPartitionedState(
+                namespace.stringKey(),
+                StringSerializer.INSTANCE,
+                flinkStateDescriptor);
+
+        AccumT current = state.value();
+        if (current == null) {
+          state.update(accum);
+        } else {
+          current = combineFn.mergeAccumulators(
+              flinkStateInternals.getKey(),
+              Lists.newArrayList(current, accum),
+              context);
+          state.update(current);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Error adding to state.", e);
+      }
+    }
+
+    @Override
+    public AccumT getAccum() {
+      try {
+        return flinkStateBackend.getPartitionedState(
+            namespace.stringKey(),
+            StringSerializer.INSTANCE,
+            flinkStateDescriptor).value();
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    @Override
+    public AccumT mergeAccumulators(Iterable<AccumT> accumulators) {
+      return combineFn.mergeAccumulators(flinkStateInternals.getKey(), 
accumulators, context);
+    }
+
+    @Override
+    public OutputT read() {
+      try {
+        org.apache.flink.api.common.state.ValueState<AccumT> state =
+            flinkStateBackend.getPartitionedState(
+                namespace.stringKey(),
+                StringSerializer.INSTANCE,
+                flinkStateDescriptor);
+
+        AccumT accum = state.value();
+        return combineFn.extractOutput(flinkStateInternals.getKey(), accum, 
context);
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public Boolean read() {
+          try {
+            return flinkStateBackend.getPartitionedState(
+                namespace.stringKey(),
+                StringSerializer.INSTANCE,
+                flinkStateDescriptor).value() == null;
+          } catch (Exception e) {
+            throw new RuntimeException("Error reading state.", e);
+          }
+
+        }
+
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+      };
+    }
+
+    @Override
+    public void clear() {
+      try {
+        flinkStateBackend.getPartitionedState(
+            namespace.stringKey(),
+            StringSerializer.INSTANCE,
+            flinkStateDescriptor).clear();
+      } catch (Exception e) {
+        throw new RuntimeException("Error clearing state.", e);
+      }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      FlinkCombiningStateWithContext<?, ?, ?, ?> that =
+          (FlinkCombiningStateWithContext<?, ?, ?, ?>) o;
+
+      return namespace.equals(that.namespace) && address.equals(that.address);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = namespace.hashCode();
+      result = 31 * result + address.hashCode();
+      return result;
+    }
+  }
+
+  private static class FlinkWatermarkHoldState<K, W extends BoundedWindow>
+      implements WatermarkHoldState<W> {
+    private final StateTag<? super K, WatermarkHoldState<W>> address;
+    private final OutputTimeFn<? super W> outputTimeFn;
+    private final StateNamespace namespace;
+    private final KeyedStateBackend<ByteBuffer> flinkStateBackend;
+    private final FlinkStateInternals<K> flinkStateInternals;
+    private final ValueStateDescriptor<Instant> flinkStateDescriptor;
+
+    public FlinkWatermarkHoldState(
+        KeyedStateBackend<ByteBuffer> flinkStateBackend,
+        FlinkStateInternals<K> flinkStateInternals,
+        StateTag<? super K, WatermarkHoldState<W>> address,
+        StateNamespace namespace,
+        OutputTimeFn<? super W> outputTimeFn) {
+      this.address = address;
+      this.outputTimeFn = outputTimeFn;
+      this.namespace = namespace;
+      this.flinkStateBackend = flinkStateBackend;
+      this.flinkStateInternals = flinkStateInternals;
+
+      CoderTypeInformation<Instant> typeInfo = new 
CoderTypeInformation<>(InstantCoder.of());
+      flinkStateDescriptor = new ValueStateDescriptor<>(address.getId(), 
typeInfo, null);
+    }
+
+    @Override
+    public OutputTimeFn<? super W> getOutputTimeFn() {
+      return outputTimeFn;
+    }
+
+    @Override
+    public WatermarkHoldState<W> readLater() {
+      return this;
+    }
+
+    @Override
+    public ReadableState<Boolean> isEmpty() {
+      return new ReadableState<Boolean>() {
+        @Override
+        public Boolean read() {
+          try {
+            return flinkStateBackend.getPartitionedState(
+                namespace.stringKey(),
+                StringSerializer.INSTANCE,
+                flinkStateDescriptor).value() == null;
+          } catch (Exception e) {
+            throw new RuntimeException("Error reading state.", e);
+          }
+        }
+
+        @Override
+        public ReadableState<Boolean> readLater() {
+          return this;
+        }
+      };
+
+    }
+
+    @Override
+    public void add(Instant value) {
+      try {
+        org.apache.flink.api.common.state.ValueState<Instant> state =
+            flinkStateBackend.getPartitionedState(
+              namespace.stringKey(),
+              StringSerializer.INSTANCE,
+              flinkStateDescriptor);
+
+        Instant current = state.value();
+        if (current == null) {
+          state.update(value);
+          flinkStateInternals.watermarkHolds.put(namespace.stringKey(), value);
+        } else {
+          Instant combined = outputTimeFn.combine(current, value);
+          state.update(combined);
+          flinkStateInternals.watermarkHolds.put(namespace.stringKey(), 
combined);
+        }
+      } catch (Exception e) {
+        throw new RuntimeException("Error updating state.", e);
+      }
+    }
+
+    @Override
+    public Instant read() {
+      try {
+        org.apache.flink.api.common.state.ValueState<Instant> state =
+            flinkStateBackend.getPartitionedState(
+                namespace.stringKey(),
+                StringSerializer.INSTANCE,
+                flinkStateDescriptor);
+        return state.value();
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    @Override
+    public void clear() {
+      flinkStateInternals.watermarkHolds.remove(namespace.stringKey());
+      try {
+        org.apache.flink.api.common.state.ValueState<Instant> state =
+            flinkStateBackend.getPartitionedState(
+                namespace.stringKey(),
+                StringSerializer.INSTANCE,
+                flinkStateDescriptor);
+        state.clear();
+      } catch (Exception e) {
+        throw new RuntimeException("Error reading state.", e);
+      }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      FlinkWatermarkHoldState<?, ?> that = (FlinkWatermarkHoldState<?, ?>) o;
+
+      if (!address.equals(that.address)) {
+        return false;
+      }
+      if (!outputTimeFn.equals(that.outputTimeFn)) {
+        return false;
+      }
+      return namespace.equals(that.namespace);
+
+    }
+
+    @Override
+    public int hashCode() {
+      int result = address.hashCode();
+      result = 31 * result + outputTimeFn.hashCode();
+      result = 31 * result + namespace.hashCode();
+      return result;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java
new file mode 100644
index 0000000..b38a520
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupCheckpointedOperator.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
+
+import java.io.DataOutputStream;
+
+/**
+ * This interface is used to checkpoint key-groups state.
+ */
+public interface KeyGroupCheckpointedOperator extends 
KeyGroupRestoringOperator{
+  /**
+   * Snapshots the state for a given {@code keyGroupIdx}.
+   *
+   * <p>AbstractStreamOperator would call this hook in
+   * AbstractStreamOperator.snapshotState() while iterating over the key 
groups.
+   * @param keyGroupIndex the id of the key-group to be put in the snapshot.
+   * @param out the stream to write to.
+   */
+  void snapshotKeyGroupState(int keyGroupIndex, DataOutputStream out) throws 
Exception;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java
new file mode 100644
index 0000000..2bdfc6e
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/KeyGroupRestoringOperator.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.state;
+
+import java.io.DataInputStream;
+
+/**
+ * This interface is used to restore key-groups state.
+ */
+public interface KeyGroupRestoringOperator {
+  /**
+   * Restore the state for a given {@code keyGroupIndex}.
+   * @param keyGroupIndex the id of the key-group to be put in the snapshot.
+   * @param in the stream to read from.
+   */
+  void restoreKeyGroupState(int keyGroupIndex, DataInputStream in) throws 
Exception;
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java
new file mode 100644
index 0000000..0004e9e
--- /dev/null
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Internal state implementation of the Beam runner for Apache Flink.
+ */
+package org.apache.beam.runners.flink.translation.wrappers.streaming.state;

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/resources/log4j.properties 
b/runners/flink/src/main/resources/log4j.properties
new file mode 100644
index 0000000..4b6a708
--- /dev/null
+++ b/runners/flink/src/main/resources/log4j.properties
@@ -0,0 +1,23 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+log4j.rootLogger=OFF,console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{2}: %m%n

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java
new file mode 100644
index 0000000..10d6d9d
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/EncodedValueComparatorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.runners.flink.translation.types.EncodedValueComparator;
+import 
org.apache.beam.runners.flink.translation.types.EncodedValueTypeInformation;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.ComparatorTestBase;
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.junit.Assert;
+
+/**
+ * Test for {@link EncodedValueComparator}.
+ */
+public class EncodedValueComparatorTest extends ComparatorTestBase<byte[]> {
+
+  @Override
+  protected TypeComparator<byte[]> createComparator(boolean ascending) {
+    return new EncodedValueTypeInformation().createComparator(ascending, new 
ExecutionConfig());
+  }
+
+  @Override
+  protected TypeSerializer<byte[]> createSerializer() {
+    return new EncodedValueTypeInformation().createSerializer(new 
ExecutionConfig());
+  }
+
+  @Override
+  protected void deepEquals(String message, byte[] should, byte[] is) {
+    Assert.assertArrayEquals(message, should, is);
+  }
+
+  @Override
+  protected byte[][] getSortedTestData() {
+    StringUtf8Coder coder = StringUtf8Coder.of();
+
+    try {
+      return new byte[][]{
+          CoderUtils.encodeToByteArray(coder, ""),
+          CoderUtils.encodeToByteArray(coder, "Lorem Ipsum Dolor Omit Longer"),
+          CoderUtils.encodeToByteArray(coder, "aaaa"),
+          CoderUtils.encodeToByteArray(coder, "abcd"),
+          CoderUtils.encodeToByteArray(coder, "abce"),
+          CoderUtils.encodeToByteArray(coder, "abdd"),
+          CoderUtils.encodeToByteArray(coder, "accd"),
+          CoderUtils.encodeToByteArray(coder, "bbcd")
+      };
+    } catch (CoderException e) {
+      throw new RuntimeException("Could not encode values.", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
new file mode 100644
index 0000000..d9d174c
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.Test;
+
+/**
+ * Tests the proper registration of the Flink runner.
+ */
+public class FlinkRunnerRegistrarTest {
+
+  @Test
+  public void testFullName() {
+    String[] args =
+        new String[] {String.format("--runner=%s", 
FlinkRunner.class.getName())};
+    PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+    assertEquals(opts.getRunner(), FlinkRunner.class);
+  }
+
+  @Test
+  public void testClassName() {
+    String[] args =
+        new String[] {String.format("--runner=%s", 
FlinkRunner.class.getSimpleName())};
+    PipelineOptions opts = PipelineOptionsFactory.fromArgs(args).create();
+    assertEquals(opts.getRunner(), FlinkRunner.class);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
new file mode 100644
index 0000000..d6240c4
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.runners.PipelineRunner;
+
+/**
+ * {@link org.apache.beam.sdk.Pipeline} for testing Dataflow programs on the
+ * {@link FlinkRunner}.
+ */
+public class FlinkTestPipeline extends Pipeline {
+
+  /**
+   * Creates and returns a new test pipeline for batch execution.
+   *
+   * <p>Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call
+   * {@link Pipeline#run} to execute the pipeline and check the tests.
+   */
+  public static FlinkTestPipeline createForBatch() {
+    return create(false);
+  }
+
+  /**
+   * Creates and returns a new test pipeline for streaming execution.
+   *
+   * <p>Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call
+   * {@link Pipeline#run} to execute the pipeline and check the tests.
+   *
+   * @return The Test Pipeline
+   */
+  public static FlinkTestPipeline createForStreaming() {
+    return create(true);
+  }
+
+  /**
+   * Creates and returns a new test pipeline for streaming or batch execution.
+   *
+   * <p>Use {@link org.apache.beam.sdk.testing.PAssert} to add tests, then call
+   * {@link Pipeline#run} to execute the pipeline and check the tests.
+   *
+   * @param streaming <code>True</code> for streaming mode, <code>False</code> 
for batch.
+   * @return The Test Pipeline.
+   */
+  private static FlinkTestPipeline create(boolean streaming) {
+    TestFlinkRunner flinkRunner = TestFlinkRunner.create(streaming);
+    return new FlinkTestPipeline(flinkRunner, 
flinkRunner.getPipelineOptions());
+  }
+
+  private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner,
+              PipelineOptions options) {
+    super(runner, options);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
new file mode 100644
index 0000000..06187f6
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Collections;
+import java.util.HashMap;
+import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests for serialization and deserialization of {@link PipelineOptions} in 
{@link DoFnOperator}.
+ */
+public class PipelineOptionsTest {
+
+  /**
+   * Pipeline options.
+   */
+  public interface MyOptions extends FlinkPipelineOptions {
+    @Description("Bla bla bla")
+    @Default.String("Hello")
+    String getTestOption();
+    void setTestOption(String value);
+  }
+
+  private static MyOptions options;
+  private static SerializedPipelineOptions serializedOptions;
+
+  private static final String[] args = new String[]{"--testOption=nothing"};
+
+  @BeforeClass
+  public static void beforeTest() {
+    options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);
+    serializedOptions = new SerializedPipelineOptions(options);
+  }
+
+  @Test
+  public void testDeserialization() {
+    MyOptions deserializedOptions = 
serializedOptions.getPipelineOptions().as(MyOptions.class);
+    assertEquals("nothing", deserializedOptions.getTestOption());
+  }
+
+  @Test
+  public void testIgnoredFieldSerialization() {
+    FlinkPipelineOptions options = 
PipelineOptionsFactory.as(FlinkPipelineOptions.class);
+    options.setStateBackend(new MemoryStateBackend());
+
+    FlinkPipelineOptions deserialized =
+        new 
SerializedPipelineOptions(options).getPipelineOptions().as(FlinkPipelineOptions.class);
+
+    assertNull(deserialized.getStateBackend());
+  }
+
+  @Test
+  public void testCaching() {
+    PipelineOptions deserializedOptions =
+        serializedOptions.getPipelineOptions().as(PipelineOptions.class);
+
+    assertNotNull(deserializedOptions);
+    assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
+    assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
+    assertTrue(deserializedOptions == serializedOptions.getPipelineOptions());
+  }
+
+  @Test(expected = Exception.class)
+  public void testNonNull() {
+    new SerializedPipelineOptions(null);
+  }
+
+  @Test(expected = Exception.class)
+  public void parDoBaseClassPipelineOptionsNullTest() {
+    DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+        new TestDoFn(),
+        WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
+        new TupleTag<String>("main-output"),
+        Collections.<TupleTag<?>>emptyList(),
+        new DoFnOperator.DefaultOutputManagerFactory<String>(),
+        WindowingStrategy.globalDefault(),
+        new HashMap<Integer, PCollectionView<?>>(),
+        Collections.<PCollectionView<?>>emptyList(),
+        null,
+        null);
+
+  }
+
+  /**
+   * Tests that PipelineOptions are present after serialization.
+   */
+  @Test
+  public void parDoBaseClassPipelineOptionsSerializationTest() throws 
Exception {
+
+    DoFnOperator<String, String, String> doFnOperator = new DoFnOperator<>(
+        new TestDoFn(),
+        WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()),
+        new TupleTag<String>("main-output"),
+        Collections.<TupleTag<?>>emptyList(),
+        new DoFnOperator.DefaultOutputManagerFactory<String>(),
+        WindowingStrategy.globalDefault(),
+        new HashMap<Integer, PCollectionView<?>>(),
+        Collections.<PCollectionView<?>>emptyList(),
+        options,
+        null);
+
+    final byte[] serialized = SerializationUtils.serialize(doFnOperator);
+
+    @SuppressWarnings("unchecked")
+    DoFnOperator<Object, Object, Object> deserialized =
+        (DoFnOperator<Object, Object, Object>) 
SerializationUtils.deserialize(serialized);
+
+    TypeInformation<WindowedValue<Object>> typeInformation = 
TypeInformation.of(
+        new TypeHint<WindowedValue<Object>>() {});
+
+    OneInputStreamOperatorTestHarness<WindowedValue<Object>, Object> 
testHarness =
+        new OneInputStreamOperatorTestHarness<>(deserialized,
+            typeInformation.createSerializer(new ExecutionConfig()));
+
+    testHarness.open();
+
+    // execute once to access options
+    testHarness.processElement(new StreamRecord<>(
+        WindowedValue.of(
+            new Object(),
+            Instant.now(),
+            GlobalWindow.INSTANCE,
+            PaneInfo.NO_FIRING)));
+
+    testHarness.close();
+
+  }
+
+
+  private static class TestDoFn extends DoFn<String, String> {
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      Assert.assertNotNull(c.getPipelineOptions());
+      Assert.assertEquals(
+          options.getTestOption(),
+          c.getPipelineOptions().as(MyOptions.class).getTestOption());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
new file mode 100644
index 0000000..44c9017
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.base.Joiner;
+import java.io.File;
+import java.net.URI;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * Reads from a bounded source in batch execution.
+ */
+public class ReadSourceITCase extends JavaProgramTestBase {
+
+  protected String resultPath;
+
+  public ReadSourceITCase(){
+  }
+
+  private static final String[] EXPECTED_RESULT = new String[] {
+     "0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+
+    // need to create the dir, otherwise Beam sinks don't
+    // work for these tests
+
+    if (!new File(new URI(resultPath)).mkdirs()) {
+      throw new RuntimeException("Could not create output dir.");
+    }
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+    runProgram(resultPath);
+  }
+
+  private static void runProgram(String resultPath) throws Exception {
+
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    PCollection<String> result = p
+        .apply(CountingInput.upTo(10))
+        .apply(ParDo.of(new DoFn<Long, String>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) throws Exception {
+            c.output(c.element().toString());
+          }
+        }));
+
+    result.apply(TextIO.Write.to(new URI(resultPath).getPath() + "/part"));
+
+    p.run();
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
new file mode 100644
index 0000000..79b7882
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink;
+
+import com.google.common.base.Joiner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.TextIO;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.flink.streaming.util.StreamingProgramTestBase;
+
+/**
+ * Reads from a bounded source in streaming.
+ */
+public class ReadSourceStreamingITCase extends StreamingProgramTestBase {
+
+  protected String resultPath;
+
+  public ReadSourceStreamingITCase(){
+  }
+
+  private static final String[] EXPECTED_RESULT = new String[] {
+     "0", "1", "2", "3", "4", "5", "6", "7", "8", "9"};
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+    runProgram(resultPath);
+  }
+
+  private static void runProgram(String resultPath) {
+
+    Pipeline p = FlinkTestPipeline.createForStreaming();
+
+    p
+      .apply(CountingInput.upTo(10))
+      .apply(ParDo.of(new DoFn<Long, String>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) throws Exception {
+            c.output(c.element().toString());
+          }
+        }))
+      .apply(TextIO.Write.to(resultPath));
+
+    p.run();
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/beam/blob/cdd2544b/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
new file mode 100644
index 0000000..38b790e
--- /dev/null
+++ 
b/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.flink;
+
+import static org.junit.Assert.assertNotNull;
+
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.io.Sink;
+import org.apache.beam.sdk.io.Write;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.test.util.JavaProgramTestBase;
+
+/**
+ * Tests the translation of custom Write sinks.
+ */
+public class WriteSinkITCase extends JavaProgramTestBase {
+
+  protected String resultPath;
+
+  public WriteSinkITCase(){
+  }
+
+  static final String[] EXPECTED_RESULT = new String[] {
+      "Joe red 3", "Mary blue 4", "Max yellow 23"};
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result-" + System.nanoTime());
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+    runProgram(resultPath);
+  }
+
+  @Override
+  public void stopCluster() throws Exception {
+    try {
+      super.stopCluster();
+    } catch (final IOException ioe) {
+      if (ioe.getMessage().startsWith("Unable to delete file")) {
+        // that's ok for the test itself, just the OS playing with us on 
cleanup phase
+      }
+    }
+  }
+
+  private static void runProgram(String resultPath) {
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    
p.apply(Create.of(ImmutableList.copyOf(EXPECTED_RESULT))).setCoder(StringUtf8Coder.of())
+      .apply("CustomSink", Write.to(new MyCustomSink(resultPath)));
+
+    p.run();
+  }
+
+  /**
+   * Simple custom sink which writes to a file.
+   */
+  private static class MyCustomSink extends Sink<String> {
+
+    private final String resultPath;
+
+    public MyCustomSink(String resultPath) {
+      this.resultPath = resultPath;
+    }
+
+    @Override
+    public void validate(PipelineOptions options) {
+      assertNotNull(options);
+    }
+
+    @Override
+    public WriteOperation<String, ?> createWriteOperation(PipelineOptions 
options) {
+      return new MyWriteOperation();
+    }
+
+    private class MyWriteOperation extends WriteOperation<String, String> {
+
+      @Override
+      public Coder<String> getWriterResultCoder() {
+        return StringUtf8Coder.of();
+      }
+
+      @Override
+      public void initialize(PipelineOptions options) throws Exception {
+
+      }
+
+      @Override
+      public void setWindowedWrites(boolean windowedWrites) {
+
+      }
+
+      @Override
+      public void finalize(Iterable<String> writerResults, PipelineOptions 
options)
+          throws Exception {
+
+      }
+
+      @Override
+      public Writer<String, String> createWriter(PipelineOptions options) 
throws Exception {
+        return new MyWriter();
+      }
+
+      @Override
+      public Sink<String> getSink() {
+        return MyCustomSink.this;
+      }
+
+      /**
+       * Simple Writer which writes to a file.
+       */
+      private class MyWriter extends Writer<String, String> {
+
+        private PrintWriter internalWriter;
+
+        @Override
+        public final void openWindowed(String uId,
+                                       BoundedWindow window,
+                                       PaneInfo paneInfo,
+                                       int shard,
+                                       int numShards) throws Exception {
+          throw new UnsupportedOperationException("Windowed writes not 
supported.");
+        }
+
+        @Override
+        public final void openUnwindowed(String uId, int shard, int numShards) 
throws Exception {
+          Path path = new Path(resultPath + "/" + uId);
+          FileSystem.get(new URI("file:///")).create(path, false);
+          internalWriter = new PrintWriter(new File(path.toUri()));
+        }
+
+        @Override
+        public void cleanup() throws Exception {
+
+        }
+
+        @Override
+        public void write(String value) throws Exception {
+          internalWriter.println(value);
+        }
+
+        @Override
+        public String close() throws Exception {
+          internalWriter.close();
+          return resultPath;
+        }
+
+        @Override
+        public WriteOperation<String, String> getWriteOperation() {
+          return MyWriteOperation.this;
+        }
+      }
+    }
+  }
+
+}
+

Reply via email to