http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 3566f7e..89243a3 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -18,173 +18,85 @@
 package org.apache.beam.runners.flink.translation.functions;
 
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import 
org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
-import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-
-import com.google.common.collect.ImmutableList;
 
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
+import java.util.Map;
 
 /**
  * Encapsulates a {@link org.apache.beam.sdk.transforms.DoFn}
  * inside a Flink {@link 
org.apache.flink.api.common.functions.RichMapPartitionFunction}.
  */
-public class FlinkDoFnFunction<IN, OUT> extends RichMapPartitionFunction<IN, 
OUT> {
+public class FlinkDoFnFunction<InputT, OutputT>
+    extends RichMapPartitionFunction<WindowedValue<InputT>, 
WindowedValue<OutputT>> {
 
-  private final DoFn<IN, OUT> doFn;
+  private final DoFn<InputT, OutputT> doFn;
   private final SerializedPipelineOptions serializedOptions;
 
-  public FlinkDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions options) {
-    this.doFn = doFn;
-    this.serializedOptions = new SerializedPipelineOptions(options);
-  }
-
-  @Override
-  public void mapPartition(Iterable<IN> values, Collector<OUT> out) throws 
Exception {
-    ProcessContext context = new ProcessContext(doFn, out);
-    this.doFn.startBundle(context);
-    for (IN value : values) {
-      context.inValue = value;
-      doFn.processElement(context);
-    }
-    this.doFn.finishBundle(context);
-  }
-  
-  private class ProcessContext extends DoFn<IN, OUT>.ProcessContext {
-
-    IN inValue;
-    Collector<OUT> outCollector;
-
-    public ProcessContext(DoFn<IN, OUT> fn, Collector<OUT> outCollector) {
-      fn.super();
-      super.setupDelegateAggregators();
-      this.outCollector = outCollector;
-    }
-
-    @Override
-    public IN element() {
-      return this.inValue;
-    }
-
+  private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
 
-    @Override
-    public Instant timestamp() {
-      return Instant.now();
-    }
+  private final boolean requiresWindowAccess;
+  private final boolean hasSideInputs;
 
-    @Override
-    public BoundedWindow window() {
-      return GlobalWindow.INSTANCE;
-    }
-
-    @Override
-    public PaneInfo pane() {
-      return PaneInfo.NO_FIRING;
-    }
+  private final WindowingStrategy<?, ?> windowingStrategy;
 
-    @Override
-    public WindowingInternals<IN, OUT> windowingInternals() {
-      return new WindowingInternals<IN, OUT>() {
-        @Override
-        public StateInternals stateInternals() {
-          return null;
-        }
-
-        @Override
-        public void outputWindowedValue(OUT output, Instant timestamp, 
Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-
-        }
-
-        @Override
-        public TimerInternals timerInternals() {
-          return null;
-        }
+  public FlinkDoFnFunction(
+      DoFn<InputT, OutputT> doFn,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+      PipelineOptions options) {
+    this.doFn = doFn;
+    this.sideInputs = sideInputs;
+    this.serializedOptions = new SerializedPipelineOptions(options);
+    this.windowingStrategy = windowingStrategy;
 
-        @Override
-        public Collection<? extends BoundedWindow> windows() {
-          return ImmutableList.of(GlobalWindow.INSTANCE);
-        }
+    this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess;
+    this.hasSideInputs = !sideInputs.isEmpty();
+  }
 
-        @Override
-        public PaneInfo pane() {
-          return PaneInfo.NO_FIRING;
-        }
+  @Override
+  public void mapPartition(
+      Iterable<WindowedValue<InputT>> values,
+      Collector<WindowedValue<OutputT>> out) throws Exception {
+
+    FlinkProcessContext<InputT, OutputT> context = new FlinkProcessContext<>(
+        serializedOptions.getPipelineOptions(),
+        getRuntimeContext(),
+        doFn,
+        windowingStrategy,
+        out,
+        sideInputs);
 
-        @Override
-        public <T> void writePCollectionViewData(TupleTag<?> tag, 
Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
-        }
+    this.doFn.startBundle(context);
 
-        @Override
-        public <T> T sideInput(PCollectionView<T> view, BoundedWindow 
mainInputWindow) {
-          throw new RuntimeException("sideInput() not implemented.");
+    if (!requiresWindowAccess || hasSideInputs) {
+      // we don't need to explode the windows
+      for (WindowedValue<InputT> value : values) {
+        context = context.forWindowedValue(value);
+        doFn.processElement(context);
+      }
+    } else {
+      // we need to explode the windows because we have per-window
+      // side inputs and window access also only works if an element
+      // is in only one window
+      for (WindowedValue<InputT> value : values) {
+        for (WindowedValue<InputT> explodedValue: value.explodeWindows()) {
+          context = context.forWindowedValue(value);
+          doFn.processElement(context);
         }
-      };
-    }
-
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return serializedOptions.getPipelineOptions();
-    }
-
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      List<T> sideInput = 
getRuntimeContext().getBroadcastVariable(view.getTagInternal().getId());
-      List<WindowedValue<?>> windowedValueList = new 
ArrayList<>(sideInput.size());
-      for (T input : sideInput) {
-        windowedValueList.add(WindowedValue.of(input, Instant.now(), 
ImmutableList.of(GlobalWindow.INSTANCE), pane()));
       }
-      return view.fromIterableInternal(windowedValueList);
     }
 
-    @Override
-    public void output(OUT output) {
-      outCollector.collect(output);
-    }
-
-    @Override
-    public void outputWithTimestamp(OUT output, Instant timestamp) {
-      // not FLink's way, just output normally
-      output(output);
-    }
-
-    @Override
-    public <T> void sideOutput(TupleTag<T> tag, T output) {
-      // ignore the side output, this can happen when a user does not register
-      // side outputs but then outputs using a freshly created TupleTag.
-    }
-
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
-      sideOutput(tag, output);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, 
AggOutputT> combiner) {
-      SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = new 
SerializableFnAggregatorWrapper<>(combiner);
-      getRuntimeContext().addAccumulator(name, wrapper);
-      return wrapper;
-    }
-
-
+    // set the windowed value to null so that the logic
+    // or outputting in finishBundle kicks in
+    context = context.forWindowedValue(null);
+    this.doFn.finishBundle(context);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkKeyedListAggregationFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkKeyedListAggregationFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkKeyedListAggregationFunction.java
deleted file mode 100644
index 7c7084d..0000000
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkKeyedListAggregationFunction.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import org.apache.beam.sdk.values.KV;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.util.Collector;
-
-import java.util.Iterator;
-
-/**
- * Flink {@link org.apache.flink.api.common.functions.GroupReduceFunction} for 
executing a
- * {@link org.apache.beam.sdk.transforms.GroupByKey} operation. This reads the 
input
- * {@link org.apache.beam.sdk.values.KV} elements, extracts the key and 
collects
- * the values in a {@code List}.
- */
-public class FlinkKeyedListAggregationFunction<K,V> implements 
GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> {
-
-  @Override
-  public void reduce(Iterable<KV<K, V>> values, Collector<KV<K, Iterable<V>>> 
out) throws Exception {
-    Iterator<KV<K, V>> it = values.iterator();
-    KV<K, V> first = it.next();
-    Iterable<V> passThrough = new PassThroughIterable<>(first, it);
-    out.collect(KV.of(first.getKey(), passThrough));
-  }
-
-  private static class PassThroughIterable<K, V> implements Iterable<V>, 
Iterator<V>  {
-    private KV<K, V> first;
-    private Iterator<KV<K, V>> iterator;
-
-    public PassThroughIterable(KV<K, V> first, Iterator<KV<K, V>> iterator) {
-      this.first = first;
-      this.iterator = iterator;
-    }
-
-    @Override
-    public Iterator<V> iterator() {
-      return this;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return first != null || iterator.hasNext();
-    }
-
-    @Override
-    public V next() {
-      if (first != null) {
-        V result = first.getValue();
-        first = null;
-        return result;
-      } else {
-        return iterator.next().getValue();
-      }
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException("Cannot remove elements from 
input.");
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
new file mode 100644
index 0000000..9074d72
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
+import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Special version of {@link FlinkReduceFunction} that supports merging 
windows. This
+ * assumes that the windows are {@link IntervalWindow IntervalWindows} and 
exhibits the
+ * same behaviour as {@code MergeOverlappingIntervalWindows}.
+ *
+ * <p>This is different from the pair of function for the non-merging windows 
case
+ * in that we cannot do combining before the shuffle because elements would not
+ * yet be in their correct windows for side-input access.
+ */
+public class FlinkMergingNonShuffleReduceFunction<
+      K, InputT, AccumT, OutputT, W extends IntervalWindow>
+    extends RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, 
WindowedValue<KV<K, OutputT>>> {
+
+  private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> 
combineFn;
+
+  private final DoFn<KV<K, InputT>, KV<K, OutputT>> doFn;
+
+  private final WindowingStrategy<?, W> windowingStrategy;
+
+  private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
+
+  private final SerializedPipelineOptions serializedOptions;
+
+  public FlinkMergingNonShuffleReduceFunction(
+      CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn,
+      WindowingStrategy<?, W> windowingStrategy,
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+      PipelineOptions pipelineOptions) {
+
+    this.combineFn = keyedCombineFn;
+
+    this.windowingStrategy = windowingStrategy;
+    this.sideInputs = sideInputs;
+
+    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+
+    // dummy DoFn because we need one for ProcessContext
+    this.doFn = new DoFn<KV<K, InputT>, KV<K, OutputT>>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+
+      }
+    };
+  }
+
+  @Override
+  public void reduce(
+      Iterable<WindowedValue<KV<K, InputT>>> elements,
+      Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
+
+    FlinkProcessContext<KV<K, InputT>, KV<K, OutputT>> processContext =
+        new FlinkProcessContext<>(
+            serializedOptions.getPipelineOptions(),
+            getRuntimeContext(),
+            doFn,
+            windowingStrategy,
+            out,
+            sideInputs);
+
+    PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner =
+        PerKeyCombineFnRunners.create(combineFn);
+
+    @SuppressWarnings("unchecked")
+    OutputTimeFn<? super BoundedWindow> outputTimeFn =
+        (OutputTimeFn<? super BoundedWindow>) 
windowingStrategy.getOutputTimeFn();
+
+    // get all elements so that we can sort them, has to fit into
+    // memory
+    // this seems very unprudent, but correct, for now
+    List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
+    for (WindowedValue<KV<K, InputT>> inputValue: elements) {
+      for (WindowedValue<KV<K, InputT>> exploded: inputValue.explodeWindows()) 
{
+        sortedInput.add(exploded);
+      }
+    }
+    Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, 
InputT>>>() {
+      @Override
+      public int compare(
+          WindowedValue<KV<K, InputT>> o1,
+          WindowedValue<KV<K, InputT>> o2) {
+        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
+            
.compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
+      }
+    });
+
+    // merge windows, we have to do it in an extra pre-processing step and
+    // can't do it as we go since the window of early elements would not
+    // be correct when calling the CombineFn
+    mergeWindow(sortedInput);
+
+    // iterate over the elements that are sorted by window timestamp
+    final Iterator<WindowedValue<KV<K, InputT>>> iterator = 
sortedInput.iterator();
+
+    // create accumulator using the first elements key
+    WindowedValue<KV<K, InputT>> currentValue = iterator.next();
+    K key = currentValue.getValue().getKey();
+    IntervalWindow currentWindow =
+        (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
+    InputT firstValue = currentValue.getValue().getValue();
+    processContext = processContext.forWindowedValue(currentValue);
+    AccumT accumulator = combineFnRunner.createAccumulator(key, 
processContext);
+    accumulator = combineFnRunner.addInput(key, accumulator, firstValue, 
processContext);
+
+    // we use this to keep track of the timestamps assigned by the OutputTimeFn
+    Instant windowTimestamp =
+        outputTimeFn.assignOutputTime(currentValue.getTimestamp(), 
currentWindow);
+
+    while (iterator.hasNext()) {
+      WindowedValue<KV<K, InputT>> nextValue = iterator.next();
+      IntervalWindow nextWindow = (IntervalWindow) 
Iterables.getOnlyElement(nextValue.getWindows());
+
+      if (currentWindow.equals(nextWindow)) {
+        // continue accumulating and merge windows
+
+        InputT value = nextValue.getValue().getValue();
+        processContext = processContext.forWindowedValue(nextValue);
+        accumulator = combineFnRunner.addInput(key, accumulator, value, 
processContext);
+
+        windowTimestamp = outputTimeFn.combine(
+            windowTimestamp,
+            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), 
currentWindow));
+
+      } else {
+        // emit the value that we currently have
+        out.collect(
+            WindowedValue.of(
+                KV.of(key, combineFnRunner.extractOutput(key, accumulator, 
processContext)),
+                windowTimestamp,
+                currentWindow,
+                PaneInfo.NO_FIRING));
+
+        currentWindow = nextWindow;
+        InputT value = nextValue.getValue().getValue();
+        processContext = processContext.forWindowedValue(nextValue);
+        accumulator = combineFnRunner.createAccumulator(key, processContext);
+        accumulator = combineFnRunner.addInput(key, accumulator, value, 
processContext);
+        windowTimestamp = 
outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
+      }
+    }
+
+    // emit the final accumulator
+    out.collect(
+        WindowedValue.of(
+            KV.of(key, combineFnRunner.extractOutput(key, accumulator, 
processContext)),
+            windowTimestamp,
+            currentWindow,
+            PaneInfo.NO_FIRING));
+  }
+
+  /**
+   * Merge windows. This assumes that the list of elements is sorted by 
window-end timestamp.
+   * This replaces windows in the input list.
+   */
+  private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) {
+    int currentStart = 0;
+    IntervalWindow currentWindow =
+        (IntervalWindow) 
Iterables.getOnlyElement(elements.get(0).getWindows());
+
+    for (int i = 1; i < elements.size(); i++) {
+      WindowedValue<KV<K, InputT>> nextValue = elements.get(i);
+      IntervalWindow nextWindow =
+          (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
+      if (currentWindow.intersects(nextWindow)) {
+        // we continue
+        currentWindow = currentWindow.span(nextWindow);
+      } else {
+        // retrofit the merged window to all windows up to "currentStart"
+        for (int j = i - 1; j >= currentStart; j--) {
+          WindowedValue<KV<K, InputT>> value = elements.get(j);
+          elements.set(
+              j,
+              WindowedValue.of(
+                  value.getValue(), value.getTimestamp(), currentWindow, 
value.getPane()));
+        }
+        currentStart = i;
+        currentWindow = nextWindow;
+      }
+    }
+    if (currentStart < elements.size() - 1) {
+      // we have to retrofit the last batch
+      for (int j = elements.size() - 1; j >= currentStart; j--) {
+        WindowedValue<KV<K, InputT>> value = elements.get(j);
+        elements.set(
+            j,
+            WindowedValue.of(
+                value.getValue(), value.getTimestamp(), currentWindow, 
value.getPane()));
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
new file mode 100644
index 0000000..c12e420
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
+import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Special version of {@link FlinkPartialReduceFunction} that supports merging 
windows. This
+ * assumes that the windows are {@link IntervalWindow IntervalWindows} and 
exhibits the
+ * same behaviour as {@code MergeOverlappingIntervalWindows}.
+ */
+public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends 
IntervalWindow>
+  extends FlinkPartialReduceFunction<K, InputT, AccumT, W> {
+
+  public FlinkMergingPartialReduceFunction(
+      CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn,
+      WindowingStrategy<?, W> windowingStrategy,
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+      PipelineOptions pipelineOptions) {
+    super(combineFn, windowingStrategy, sideInputs, pipelineOptions);
+  }
+
+  @Override
+  public void combine(
+      Iterable<WindowedValue<KV<K, InputT>>> elements,
+      Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception {
+
+    FlinkProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext =
+        new FlinkProcessContext<>(
+            serializedOptions.getPipelineOptions(),
+            getRuntimeContext(),
+            doFn,
+            windowingStrategy,
+            out,
+            sideInputs);
+
+    PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
+        PerKeyCombineFnRunners.create(combineFn);
+
+    @SuppressWarnings("unchecked")
+    OutputTimeFn<? super BoundedWindow> outputTimeFn =
+        (OutputTimeFn<? super BoundedWindow>) 
windowingStrategy.getOutputTimeFn();
+
+    // get all elements so that we can sort them, has to fit into
+    // memory
+    // this seems very unprudent, but correct, for now
+    List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
+    for (WindowedValue<KV<K, InputT>> inputValue: elements) {
+      for (WindowedValue<KV<K, InputT>> exploded: inputValue.explodeWindows()) 
{
+        sortedInput.add(exploded);
+      }
+    }
+    Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, 
InputT>>>() {
+      @Override
+      public int compare(
+          WindowedValue<KV<K, InputT>> o1,
+          WindowedValue<KV<K, InputT>> o2) {
+        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
+            
.compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
+      }
+    });
+
+    // merge windows, we have to do it in an extra pre-processing step and
+    // can't do it as we go since the window of early elements would not
+    // be correct when calling the CombineFn
+    mergeWindow(sortedInput);
+
+    // iterate over the elements that are sorted by window timestamp
+    final Iterator<WindowedValue<KV<K, InputT>>> iterator = 
sortedInput.iterator();
+
+    // create accumulator using the first elements key
+    WindowedValue<KV<K, InputT>> currentValue = iterator.next();
+    K key = currentValue.getValue().getKey();
+    IntervalWindow currentWindow =
+        (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
+    InputT firstValue = currentValue.getValue().getValue();
+    processContext = processContext.forWindowedValue(currentValue);
+    AccumT accumulator = combineFnRunner.createAccumulator(key, 
processContext);
+    accumulator = combineFnRunner.addInput(key, accumulator, firstValue, 
processContext);
+
+    // we use this to keep track of the timestamps assigned by the OutputTimeFn
+    Instant windowTimestamp =
+        outputTimeFn.assignOutputTime(currentValue.getTimestamp(), 
currentWindow);
+
+    while (iterator.hasNext()) {
+      WindowedValue<KV<K, InputT>> nextValue = iterator.next();
+      IntervalWindow nextWindow = (IntervalWindow) 
Iterables.getOnlyElement(nextValue.getWindows());
+
+      if (currentWindow.equals(nextWindow)) {
+        // continue accumulating and merge windows
+
+        InputT value = nextValue.getValue().getValue();
+        processContext = processContext.forWindowedValue(nextValue);
+        accumulator = combineFnRunner.addInput(key, accumulator, value, 
processContext);
+
+        windowTimestamp = outputTimeFn.combine(
+            windowTimestamp,
+            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), 
currentWindow));
+
+      } else {
+        // emit the value that we currently have
+        out.collect(
+            WindowedValue.of(
+                KV.of(key, accumulator),
+                windowTimestamp,
+                currentWindow,
+                PaneInfo.NO_FIRING));
+
+        currentWindow = nextWindow;
+        InputT value = nextValue.getValue().getValue();
+        processContext = processContext.forWindowedValue(nextValue);
+        accumulator = combineFnRunner.createAccumulator(key, processContext);
+        accumulator = combineFnRunner.addInput(key, accumulator, value, 
processContext);
+        windowTimestamp = 
outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
+      }
+    }
+
+    // emit the final accumulator
+    out.collect(
+        WindowedValue.of(
+            KV.of(key, accumulator),
+            windowTimestamp,
+            currentWindow,
+            PaneInfo.NO_FIRING));
+  }
+
+  /**
+   * Merge windows. This assumes that the list of elements is sorted by 
window-end timestamp.
+   * This replaces windows in the input list.
+   */
+  private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) {
+    int currentStart = 0;
+    IntervalWindow currentWindow =
+        (IntervalWindow) 
Iterables.getOnlyElement(elements.get(0).getWindows());
+
+    for (int i = 1; i < elements.size(); i++) {
+      WindowedValue<KV<K, InputT>> nextValue = elements.get(i);
+      IntervalWindow nextWindow =
+          (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
+      if (currentWindow.intersects(nextWindow)) {
+        // we continue
+        currentWindow = currentWindow.span(nextWindow);
+      } else {
+        // retrofit the merged window to all windows up to "currentStart"
+        for (int j = i - 1; j >= currentStart; j--) {
+          WindowedValue<KV<K, InputT>> value = elements.get(j);
+          elements.set(
+              j,
+              WindowedValue.of(
+                  value.getValue(), value.getTimestamp(), currentWindow, 
value.getPane()));
+        }
+        currentStart = i;
+        currentWindow = nextWindow;
+      }
+    }
+    if (currentStart < elements.size() - 1) {
+      // we have to retrofit the last batch
+      for (int j = elements.size() - 1; j >= currentStart; j--) {
+        WindowedValue<KV<K, InputT>> value = elements.get(j);
+        elements.set(
+            j,
+            WindowedValue.of(
+                value.getValue(), value.getTimestamp(), currentWindow, 
value.getPane()));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
new file mode 100644
index 0000000..07d1c97
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
+import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Special version of {@link FlinkReduceFunction} that supports merging 
windows. This
+ * assumes that the windows are {@link IntervalWindow IntervalWindows} and 
exhibits the
+ * same behaviour as {@code MergeOverlappingIntervalWindows}.
+ */
+public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends 
IntervalWindow>
+    extends FlinkReduceFunction<K, AccumT, OutputT, W> {
+
+  public FlinkMergingReduceFunction(
+      CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn,
+      WindowingStrategy<?, W> windowingStrategy,
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+      PipelineOptions pipelineOptions) {
+    super(keyedCombineFn, windowingStrategy, sideInputs, pipelineOptions);
+  }
+
+  @Override
+  public void reduce(
+      Iterable<WindowedValue<KV<K, AccumT>>> elements,
+      Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
+
+    FlinkProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext =
+        new FlinkProcessContext<>(
+            serializedOptions.getPipelineOptions(),
+            getRuntimeContext(),
+            doFn,
+            windowingStrategy,
+            out,
+            sideInputs);
+
+    PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
+        PerKeyCombineFnRunners.create(combineFn);
+
+    @SuppressWarnings("unchecked")
+    OutputTimeFn<? super BoundedWindow> outputTimeFn =
+        (OutputTimeFn<? super BoundedWindow>) 
windowingStrategy.getOutputTimeFn();
+
+
+    // get all elements so that we can sort them, has to fit into
+    // memory
+    // this seems very unprudent, but correct, for now
+    ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList();
+    for (WindowedValue<KV<K, AccumT>> inputValue: elements) {
+      for (WindowedValue<KV<K, AccumT>> exploded: inputValue.explodeWindows()) 
{
+        sortedInput.add(exploded);
+      }
+    }
+    Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, 
AccumT>>>() {
+      @Override
+      public int compare(
+          WindowedValue<KV<K, AccumT>> o1,
+          WindowedValue<KV<K, AccumT>> o2) {
+        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
+            
.compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
+      }
+    });
+
+    // merge windows, we have to do it in an extra pre-processing step and
+    // can't do it as we go since the window of early elements would not
+    // be correct when calling the CombineFn
+    mergeWindow(sortedInput);
+
+    // iterate over the elements that are sorted by window timestamp
+    final Iterator<WindowedValue<KV<K, AccumT>>> iterator = 
sortedInput.iterator();
+
+    // get the first accumulator
+    WindowedValue<KV<K, AccumT>> currentValue = iterator.next();
+    K key = currentValue.getValue().getKey();
+    IntervalWindow currentWindow =
+        (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
+    AccumT accumulator = currentValue.getValue().getValue();
+
+    // we use this to keep track of the timestamps assigned by the 
OutputTimeFn,
+    // in FlinkPartialReduceFunction we already merge the timestamps assigned
+    // to individual elements, here we just merge them
+    List<Instant> windowTimestamps = new ArrayList<>();
+    windowTimestamps.add(currentValue.getTimestamp());
+
+    while (iterator.hasNext()) {
+      WindowedValue<KV<K, AccumT>> nextValue = iterator.next();
+      IntervalWindow nextWindow =
+          (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
+
+      if (nextWindow.equals(currentWindow)) {
+        // continue accumulating and merge windows
+
+        processContext = processContext.forWindowedValue(nextValue);
+
+        accumulator = combineFnRunner.mergeAccumulators(
+            key, ImmutableList.of(accumulator, 
nextValue.getValue().getValue()), processContext);
+
+        windowTimestamps.add(nextValue.getTimestamp());
+      } else {
+        out.collect(
+            WindowedValue.of(
+                KV.of(key, combineFnRunner.extractOutput(key, accumulator, 
processContext)),
+                outputTimeFn.merge(currentWindow, windowTimestamps),
+                currentWindow,
+                PaneInfo.NO_FIRING));
+
+        windowTimestamps.clear();
+
+        processContext = processContext.forWindowedValue(nextValue);
+
+        currentWindow = nextWindow;
+        accumulator = nextValue.getValue().getValue();
+        windowTimestamps.add(nextValue.getTimestamp());
+      }
+    }
+
+    // emit the final accumulator
+    out.collect(
+        WindowedValue.of(
+            KV.of(key, combineFnRunner.extractOutput(key, accumulator, 
processContext)),
+            outputTimeFn.merge(currentWindow, windowTimestamps),
+            currentWindow,
+            PaneInfo.NO_FIRING));
+  }
+
+  /**
+   * Merge windows. This assumes that the list of elements is sorted by 
window-end timestamp.
+   * This replaces windows in the input list.
+   */
+  private void mergeWindow(List<WindowedValue<KV<K, AccumT>>> elements) {
+    int currentStart = 0;
+    IntervalWindow currentWindow =
+        (IntervalWindow) 
Iterables.getOnlyElement(elements.get(0).getWindows());
+
+    for (int i = 1; i < elements.size(); i++) {
+      WindowedValue<KV<K, AccumT>> nextValue = elements.get(i);
+      IntervalWindow nextWindow =
+          (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows());
+      if (currentWindow.intersects(nextWindow)) {
+        // we continue
+        currentWindow = currentWindow.span(nextWindow);
+      } else {
+        // retrofit the merged window to all windows up to "currentStart"
+        for (int j = i - 1; j >= currentStart; j--) {
+          WindowedValue<KV<K, AccumT>> value = elements.get(j);
+          elements.set(
+              j,
+              WindowedValue.of(
+                  value.getValue(), value.getTimestamp(), currentWindow, 
value.getPane()));
+        }
+        currentStart = i;
+        currentWindow = nextWindow;
+      }
+    }
+    if (currentStart < elements.size() - 1) {
+      // we have to retrofit the last batch
+      for (int j = elements.size() - 1; j >= currentStart; j--) {
+        WindowedValue<KV<K, AccumT>> value = elements.get(j);
+        elements.set(
+            j,
+            WindowedValue.of(
+                value.getValue(), value.getTimestamp(), currentWindow, 
value.getPane()));
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index 476dc5e..f92e76f 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -18,28 +18,17 @@
 package org.apache.beam.runners.flink.translation.functions;
 
 import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
-import 
org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
 import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 
-import com.google.common.collect.ImmutableList;
-
 import org.apache.flink.api.common.functions.RichMapPartitionFunction;
 import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Map;
 
 /**
@@ -50,112 +39,72 @@ import java.util.Map;
  * and must tag all outputs with the output number. Afterwards a filter will 
filter out
  * those elements that are not to be in a specific output.
  */
-public class FlinkMultiOutputDoFnFunction<IN, OUT> extends 
RichMapPartitionFunction<IN, RawUnionValue> {
-
-  private final DoFn<IN, OUT> doFn;
-  private final SerializedPipelineOptions serializedPipelineOptions;
-  private final Map<TupleTag<?>, Integer> outputMap;
-
-  public FlinkMultiOutputDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions 
options, Map<TupleTag<?>, Integer> outputMap) {
-    this.doFn = doFn;
-    this.serializedPipelineOptions = new SerializedPipelineOptions(options);
-    this.outputMap = outputMap;
-  }
-
-  @Override
-  public void mapPartition(Iterable<IN> values, Collector<RawUnionValue> out) 
throws Exception {
-    ProcessContext context = new ProcessContext(doFn, out);
-    this.doFn.startBundle(context);
-    for (IN value : values) {
-      context.inValue = value;
-      doFn.processElement(context);
-    }
-    this.doFn.finishBundle(context);
-  }
+public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
+    extends RichMapPartitionFunction<WindowedValue<InputT>, 
WindowedValue<RawUnionValue>> {
 
-  private class ProcessContext extends DoFn<IN, OUT>.ProcessContext {
+  private final DoFn<InputT, OutputT> doFn;
+  private final SerializedPipelineOptions serializedOptions;
 
-    IN inValue;
-    Collector<RawUnionValue> outCollector;
+  private final Map<TupleTag<?>, Integer> outputMap;
 
-    public ProcessContext(DoFn<IN, OUT> fn, Collector<RawUnionValue> 
outCollector) {
-      fn.super();
-      this.outCollector = outCollector;
-    }
+  private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
 
-    @Override
-    public IN element() {
-      return this.inValue;
-    }
+  private final boolean requiresWindowAccess;
+  private final boolean hasSideInputs;
 
-    @Override
-    public Instant timestamp() {
-      return Instant.now();
-    }
+  private final WindowingStrategy<?, ?> windowingStrategy;
 
-    @Override
-    public BoundedWindow window() {
-      return GlobalWindow.INSTANCE;
-    }
+  public FlinkMultiOutputDoFnFunction(
+      DoFn<InputT, OutputT> doFn,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+      PipelineOptions options,
+      Map<TupleTag<?>, Integer> outputMap) {
+    this.doFn = doFn;
+    this.serializedOptions = new SerializedPipelineOptions(options);
+    this.outputMap = outputMap;
 
-    @Override
-    public PaneInfo pane() {
-      return PaneInfo.NO_FIRING;
-    }
+    this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess;
+    this.hasSideInputs = !sideInputs.isEmpty();
+    this.windowingStrategy = windowingStrategy;
+    this.sideInputs = sideInputs;
+  }
 
-    @Override
-    public WindowingInternals<IN, OUT> windowingInternals() {
-      return null;
-    }
+  @Override
+  public void mapPartition(
+      Iterable<WindowedValue<InputT>> values,
+      Collector<WindowedValue<RawUnionValue>> out) throws Exception {
+
+    FlinkProcessContext<InputT, OutputT> context = new 
FlinkMultiOutputProcessContext<>(
+        serializedOptions.getPipelineOptions(),
+        getRuntimeContext(),
+        doFn,
+        windowingStrategy,
+        out,
+        outputMap,
+        sideInputs);
 
-    @Override
-    public PipelineOptions getPipelineOptions() {
-      return serializedPipelineOptions.getPipelineOptions();
-    }
+    this.doFn.startBundle(context);
 
-    @Override
-    public <T> T sideInput(PCollectionView<T> view) {
-      List<T> sideInput = 
getRuntimeContext().getBroadcastVariable(view.getTagInternal()
-          .getId());
-      List<WindowedValue<?>> windowedValueList = new 
ArrayList<>(sideInput.size());
-      for (T input : sideInput) {
-        windowedValueList.add(WindowedValue.of(input, Instant.now(), 
ImmutableList.of(GlobalWindow.INSTANCE), pane()));
+    if (!requiresWindowAccess || hasSideInputs) {
+      // we don't need to explode the windows
+      for (WindowedValue<InputT> value : values) {
+        context = context.forWindowedValue(value);
+        doFn.processElement(context);
       }
-      return view.fromIterableInternal(windowedValueList);
-    }
-
-    @Override
-    public void output(OUT value) {
-      // assume that index 0 is the default output
-      outCollector.collect(new RawUnionValue(0, value));
-    }
-
-    @Override
-    public void outputWithTimestamp(OUT output, Instant timestamp) {
-      // not FLink's way, just output normally
-      output(output);
-    }
-
-    @Override
-    @SuppressWarnings("unchecked")
-    public <T> void sideOutput(TupleTag<T> tag, T value) {
-      Integer index = outputMap.get(tag);
-      if (index != null) {
-        outCollector.collect(new RawUnionValue(index, value));
+    } else {
+      // we need to explode the windows because we have per-window
+      // side inputs and window access also only works if an element
+      // is in only one window
+      for (WindowedValue<InputT> value : values) {
+        for (WindowedValue<InputT> explodedValue: value.explodeWindows()) {
+          context = context.forWindowedValue(value);
+          doFn.processElement(context);
+        }
       }
     }
 
-    @Override
-    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
-      sideOutput(tag, output);
-    }
-
-    @Override
-    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, 
AggOutputT> combiner) {
-      SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = new 
SerializableFnAggregatorWrapper<>(combiner);
-      getRuntimeContext().addAccumulator(name, wrapper);
-      return null;
-    }
 
+    this.doFn.finishBundle(context);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
new file mode 100644
index 0000000..71b6d27
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * {@link DoFn.ProcessContext} for {@link FlinkMultiOutputDoFnFunction} that 
supports
+ * side outputs.
+ */
+class FlinkMultiOutputProcessContext<InputT, OutputT>
+    extends FlinkProcessContext<InputT, OutputT> {
+
+  // we need a different Collector from the base class
+  private final Collector<WindowedValue<RawUnionValue>> collector;
+
+  private final Map<TupleTag<?>, Integer> outputMap;
+
+
+  FlinkMultiOutputProcessContext(
+      PipelineOptions pipelineOptions,
+      RuntimeContext runtimeContext,
+      DoFn<InputT, OutputT> doFn,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Collector<WindowedValue<RawUnionValue>> collector,
+      Map<TupleTag<?>, Integer> outputMap,
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) {
+    super(
+        pipelineOptions,
+        runtimeContext,
+        doFn,
+        windowingStrategy,
+        new Collector<WindowedValue<OutputT>>() {
+          @Override
+          public void collect(WindowedValue<OutputT> outputTWindowedValue) {
+
+          }
+
+          @Override
+          public void close() {
+
+          }
+        },
+        sideInputs);
+
+    this.collector = collector;
+    this.outputMap = outputMap;
+  }
+
+  @Override
+  public FlinkProcessContext<InputT, OutputT> forWindowedValue(
+      WindowedValue<InputT> windowedValue) {
+    this.windowedValue = windowedValue;
+    return this;
+  }
+
+  @Override
+  public void outputWithTimestamp(OutputT value, Instant timestamp) {
+    if (windowedValue == null) {
+      // we are in startBundle() or finishBundle()
+
+      try {
+        Collection windows = windowingStrategy.getWindowFn().assignWindows(
+            new FlinkNoElementAssignContext(
+                windowingStrategy.getWindowFn(),
+                value,
+                timestamp));
+
+        collector.collect(
+            WindowedValue.of(
+                new RawUnionValue(0, value),
+                timestamp != null ? timestamp : new Instant(Long.MIN_VALUE),
+                windows,
+                PaneInfo.NO_FIRING));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      collector.collect(
+          WindowedValue.of(
+              new RawUnionValue(0, value),
+              windowedValue.getTimestamp(),
+              windowedValue.getWindows(),
+              windowedValue.getPane()));
+    }
+  }
+
+  @Override
+  protected void outputWithTimestampAndWindow(
+      OutputT value,
+      Instant timestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane) {
+    collector.collect(
+        WindowedValue.of(
+            new RawUnionValue(0, value), timestamp, windows, pane));
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public <T> void sideOutput(TupleTag<T> tag, T value) {
+    if (windowedValue != null) {
+      sideOutputWithTimestamp(tag, value, windowedValue.getTimestamp());
+    } else {
+      sideOutputWithTimestamp(tag, value, null);
+    }
+  }
+
+  @Override
+  public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T value, Instant 
timestamp) {
+    Integer index = outputMap.get(tag);
+
+    if (index == null) {
+      throw new IllegalArgumentException("Unknown side output tag: " + tag);
+    }
+
+    if (windowedValue == null) {
+      // we are in startBundle() or finishBundle()
+
+      try {
+        Collection windows = windowingStrategy.getWindowFn().assignWindows(
+            new FlinkNoElementAssignContext(
+                windowingStrategy.getWindowFn(),
+                value,
+                timestamp));
+
+        collector.collect(
+            WindowedValue.of(
+                new RawUnionValue(index, value),
+                timestamp != null ? timestamp : new Instant(Long.MIN_VALUE),
+                windows,
+                PaneInfo.NO_FIRING));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      collector.collect(
+          WindowedValue.of(
+              new RawUnionValue(index, value),
+              windowedValue.getTimestamp(),
+              windowedValue.getWindows(),
+              windowedValue.getPane()));
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java
index 58a36b2..9205a55 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java
@@ -18,27 +18,34 @@
 package org.apache.beam.runners.flink.translation.functions;
 
 import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.util.WindowedValue;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.util.Collector;
 
 /**
- * A FlatMap function that filters out those elements that don't belong in 
this output. We need
- * this to implement MultiOutput ParDo functions.
+ * A {@link FlatMapFunction} function that filters out those elements that 
don't belong in this
+ * output. We need this to implement MultiOutput ParDo functions in 
combination with
+ * {@link FlinkMultiOutputDoFnFunction}.
  */
-public class FlinkMultiOutputPruningFunction<T> implements 
FlatMapFunction<RawUnionValue, T> {
+public class FlinkMultiOutputPruningFunction<T>
+    implements FlatMapFunction<WindowedValue<RawUnionValue>, WindowedValue<T>> 
{
 
-  private final int outputTag;
+  private final int ourOutputTag;
 
-  public FlinkMultiOutputPruningFunction(int outputTag) {
-    this.outputTag = outputTag;
+  public FlinkMultiOutputPruningFunction(int ourOutputTag) {
+    this.ourOutputTag = ourOutputTag;
   }
 
   @Override
   @SuppressWarnings("unchecked")
-  public void flatMap(RawUnionValue rawUnionValue, Collector<T> collector) 
throws Exception {
-    if (rawUnionValue.getUnionTag() == outputTag) {
-      collector.collect((T) rawUnionValue.getValue());
+  public void flatMap(
+      WindowedValue<RawUnionValue> windowedValue,
+      Collector<WindowedValue<T>> collector) throws Exception {
+    int unionTag = windowedValue.getValue().getUnionTag();
+    if (unionTag == ourOutputTag) {
+      collector.collect(
+          (WindowedValue<T>) 
windowedValue.withValue(windowedValue.getValue().getValue()));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
new file mode 100644
index 0000000..892f7a1
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+
+import org.joda.time.Instant;
+
+import java.util.Collection;
+
+/**
+ * {@link WindowFn.AssignContext} for calling a {@link WindowFn} for elements 
emitted from
+ * {@link org.apache.beam.sdk.transforms.DoFn#startBundle(DoFn.Context)}
+ * or {@link DoFn#finishBundle(DoFn.Context)}.
+ *
+ * <p>In those cases the {@code WindowFn} is not allowed to access any element 
information.
+ */
+class FlinkNoElementAssignContext<InputT, W extends BoundedWindow>
+    extends WindowFn<InputT, W>.AssignContext {
+
+  private final InputT element;
+  private final Instant timestamp;
+
+  FlinkNoElementAssignContext(
+      WindowFn<InputT, W> fn,
+      InputT element,
+      Instant timestamp) {
+    fn.super();
+
+    this.element = element;
+    // the timestamp can be null, in that case output is called
+    // without a timestamp
+    this.timestamp = timestamp;
+  }
+
+  @Override
+  public InputT element() {
+    return element;
+  }
+
+  @Override
+  public Instant timestamp() {
+    if (timestamp != null) {
+      return timestamp;
+    } else {
+      throw new UnsupportedOperationException("No timestamp available.");
+    }
+  }
+
+  @Override
+  public Collection<? extends BoundedWindow> windows() {
+    throw new UnsupportedOperationException("No windows available.");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index a2bab2b..c29e1df 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -17,45 +17,170 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
-import org.apache.beam.sdk.transforms.Combine;
+import 
org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.CombineFnBase;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.PerKeyCombineFnRunner;
+import org.apache.beam.sdk.util.PerKeyCombineFnRunners;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
 
-import org.apache.flink.api.common.functions.GroupCombineFunction;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import org.apache.flink.api.common.functions.RichGroupCombineFunction;
 import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Iterator;
+import java.util.Map;
 
 /**
- * Flink {@link org.apache.flink.api.common.functions.GroupCombineFunction} 
for executing a
- * {@link org.apache.beam.sdk.transforms.Combine.PerKey} operation. This reads 
the input
- * {@link org.apache.beam.sdk.values.KV} elements VI, extracts the key and 
emits accumulated
- * values which have the intermediate format VA.
+ * This is is the first step for executing a {@link 
org.apache.beam.sdk.transforms.Combine.PerKey}
+ * on Flink. The second part is {@link FlinkReduceFunction}. This function 
performs a local
+ * combine step before shuffling while the latter does the final combination 
after a shuffle.
+ *
+ * <p>The input to {@link #combine(Iterable, Collector)} are elements of the 
same key but
+ * for different windows. We have to ensure that we only combine elements of 
matching
+ * windows.
  */
-public class FlinkPartialReduceFunction<K, VI, VA> implements 
GroupCombineFunction<KV<K, VI>, KV<K, VA>> {
+public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends 
BoundedWindow>
+    extends RichGroupCombineFunction<WindowedValue<KV<K, InputT>>, 
WindowedValue<KV<K, AccumT>>> {
+
+  protected final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> 
combineFn;
+
+  protected final DoFn<KV<K, InputT>, KV<K, AccumT>> doFn;
+
+  protected final WindowingStrategy<?, W> windowingStrategy;
+
+  protected final SerializedPipelineOptions serializedOptions;
 
-  private final Combine.KeyedCombineFn<K, VI, VA, ?> keyedCombineFn;
+  protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
 
-  public FlinkPartialReduceFunction(Combine.KeyedCombineFn<K, VI, VA, ?>
-                                        keyedCombineFn) {
-    this.keyedCombineFn = keyedCombineFn;
+  public FlinkPartialReduceFunction(
+      CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn,
+      WindowingStrategy<?, W> windowingStrategy,
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+      PipelineOptions pipelineOptions) {
+
+    this.combineFn = combineFn;
+    this.windowingStrategy = windowingStrategy;
+    this.sideInputs = sideInputs;
+    this.serializedOptions = new SerializedPipelineOptions(pipelineOptions);
+
+    // dummy DoFn because we need one for ProcessContext
+    this.doFn = new DoFn<KV<K, InputT>, KV<K, AccumT>>() {
+      @Override
+      public void processElement(ProcessContext c) throws Exception {
+
+      }
+    };
   }
 
   @Override
-  public void combine(Iterable<KV<K, VI>> elements, Collector<KV<K, VA>> out) 
throws Exception {
+  public void combine(
+      Iterable<WindowedValue<KV<K, InputT>>> elements,
+      Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception {
+
+    FlinkProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext =
+        new FlinkProcessContext<>(
+            serializedOptions.getPipelineOptions(),
+            getRuntimeContext(),
+            doFn,
+            windowingStrategy,
+            out,
+            sideInputs);
+
+    PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
+        PerKeyCombineFnRunners.create(combineFn);
+
+    @SuppressWarnings("unchecked")
+    OutputTimeFn<? super BoundedWindow> outputTimeFn =
+        (OutputTimeFn<? super BoundedWindow>) 
windowingStrategy.getOutputTimeFn();
+
+    // get all elements so that we can sort them, has to fit into
+    // memory
+    // this seems very unprudent, but correct, for now
+    ArrayList<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList();
+    for (WindowedValue<KV<K, InputT>> inputValue: elements) {
+      for (WindowedValue<KV<K, InputT>> exploded: inputValue.explodeWindows()) 
{
+        sortedInput.add(exploded);
+      }
+    }
+    Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, 
InputT>>>() {
+      @Override
+      public int compare(
+          WindowedValue<KV<K, InputT>> o1,
+          WindowedValue<KV<K, InputT>> o2) {
+        return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp()
+            
.compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp());
+      }
+    });
+
+    // iterate over the elements that are sorted by window timestamp
+    //
+    final Iterator<WindowedValue<KV<K, InputT>>> iterator = 
sortedInput.iterator();
 
-    final Iterator<KV<K, VI>> iterator = elements.iterator();
     // create accumulator using the first elements key
-    KV<K, VI> first = iterator.next();
-    K key = first.getKey();
-    VI value = first.getValue();
-    VA accumulator = keyedCombineFn.createAccumulator(key);
-    accumulator = keyedCombineFn.addInput(key, accumulator, value);
-
-    while(iterator.hasNext()) {
-      value = iterator.next().getValue();
-      accumulator = keyedCombineFn.addInput(key, accumulator, value);
+    WindowedValue<KV<K, InputT>> currentValue = iterator.next();
+    K key = currentValue.getValue().getKey();
+    BoundedWindow currentWindow = 
Iterables.getFirst(currentValue.getWindows(), null);
+    InputT firstValue = currentValue.getValue().getValue();
+    processContext = processContext.forWindowedValue(currentValue);
+    AccumT accumulator = combineFnRunner.createAccumulator(key, 
processContext);
+    accumulator = combineFnRunner.addInput(key, accumulator, firstValue, 
processContext);
+
+    // we use this to keep track of the timestamps assigned by the OutputTimeFn
+    Instant windowTimestamp =
+        outputTimeFn.assignOutputTime(currentValue.getTimestamp(), 
currentWindow);
+
+    while (iterator.hasNext()) {
+      WindowedValue<KV<K, InputT>> nextValue = iterator.next();
+      BoundedWindow nextWindow = 
Iterables.getOnlyElement(nextValue.getWindows());
+
+      if (nextWindow.equals(currentWindow)) {
+        // continue accumulating
+        InputT value = nextValue.getValue().getValue();
+        processContext = processContext.forWindowedValue(nextValue);
+        accumulator = combineFnRunner.addInput(key, accumulator, value, 
processContext);
+
+        windowTimestamp = outputTimeFn.combine(
+            windowTimestamp,
+            outputTimeFn.assignOutputTime(nextValue.getTimestamp(), 
currentWindow));
+
+      } else {
+        // emit the value that we currently have
+        out.collect(
+            WindowedValue.of(
+                KV.of(key, accumulator),
+                windowTimestamp,
+                currentWindow,
+                PaneInfo.NO_FIRING));
+
+        currentWindow = nextWindow;
+        InputT value = nextValue.getValue().getValue();
+        processContext = processContext.forWindowedValue(nextValue);
+        accumulator = combineFnRunner.createAccumulator(key, processContext);
+        accumulator = combineFnRunner.addInput(key, accumulator, value, 
processContext);
+        windowTimestamp = 
outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
+      }
     }
 
-    out.collect(KV.of(key, accumulator));
+    // emit the final accumulator
+    out.collect(
+        WindowedValue.of(
+            KV.of(key, accumulator),
+            windowTimestamp,
+            currentWindow,
+            PaneInfo.NO_FIRING));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
new file mode 100644
index 0000000..0f1885c
--- /dev/null
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import 
org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Iterables;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * {@link org.apache.beam.sdk.transforms.DoFn.ProcessContext} for our Flink 
Wrappers.
+ */
+class FlinkProcessContext<InputT, OutputT>
+    extends DoFn<InputT, OutputT>.ProcessContext {
+
+  private final PipelineOptions pipelineOptions;
+  private final RuntimeContext runtimeContext;
+  private Collector<WindowedValue<OutputT>> collector;
+  private final boolean requiresWindowAccess;
+
+  protected WindowedValue<InputT> windowedValue;
+
+  protected WindowingStrategy<?, ?> windowingStrategy;
+
+  private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
+
+  FlinkProcessContext(
+      PipelineOptions pipelineOptions,
+      RuntimeContext runtimeContext,
+      DoFn<InputT, OutputT> doFn,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Collector<WindowedValue<OutputT>> collector,
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) {
+    doFn.super();
+    Preconditions.checkNotNull(pipelineOptions);
+    Preconditions.checkNotNull(runtimeContext);
+    Preconditions.checkNotNull(doFn);
+    Preconditions.checkNotNull(collector);
+
+    this.pipelineOptions = pipelineOptions;
+    this.runtimeContext = runtimeContext;
+    this.collector = collector;
+    this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess;
+    this.windowingStrategy = windowingStrategy;
+    this.sideInputs = sideInputs;
+
+    super.setupDelegateAggregators();
+  }
+
+  FlinkProcessContext(
+      PipelineOptions pipelineOptions,
+      RuntimeContext runtimeContext,
+      DoFn<InputT, OutputT> doFn,
+      WindowingStrategy<?, ?> windowingStrategy,
+      Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) {
+    doFn.super();
+    Preconditions.checkNotNull(pipelineOptions);
+    Preconditions.checkNotNull(runtimeContext);
+    Preconditions.checkNotNull(doFn);
+
+    this.pipelineOptions = pipelineOptions;
+    this.runtimeContext = runtimeContext;
+    this.collector = null;
+    this.requiresWindowAccess = doFn instanceof DoFn.RequiresWindowAccess;
+    this.windowingStrategy = windowingStrategy;
+    this.sideInputs = sideInputs;
+
+    super.setupDelegateAggregators();
+  }
+
+  public FlinkProcessContext<InputT, OutputT> forOutput(
+      Collector<WindowedValue<OutputT>> collector) {
+    this.collector = collector;
+
+    // for now, returns ourselves, to be easy on the GC
+    return this;
+  }
+
+
+
+  public FlinkProcessContext<InputT, OutputT> forWindowedValue(
+      WindowedValue<InputT> windowedValue) {
+    this.windowedValue = windowedValue;
+
+    // for now, returns ourselves, to be easy on the GC
+    return this;
+  }
+
+  @Override
+  public InputT element() {
+    return this.windowedValue.getValue();
+  }
+
+
+  @Override
+  public Instant timestamp() {
+    return windowedValue.getTimestamp();
+  }
+
+  @Override
+  public BoundedWindow window() {
+    if (!requiresWindowAccess) {
+      throw new UnsupportedOperationException(
+          "window() is only available in the context of a DoFn marked as 
RequiresWindow.");
+    }
+    return Iterables.getOnlyElement(windowedValue.getWindows());
+  }
+
+  @Override
+  public PaneInfo pane() {
+    return windowedValue.getPane();
+  }
+
+  @Override
+  public WindowingInternals<InputT, OutputT> windowingInternals() {
+
+    return new WindowingInternals<InputT, OutputT>() {
+
+      @Override
+      public StateInternals stateInternals() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public void outputWindowedValue(
+          OutputT value,
+          Instant timestamp,
+          Collection<? extends BoundedWindow> windows,
+          PaneInfo pane) {
+        collector.collect(WindowedValue.of(value, timestamp, windows, pane));
+        outputWithTimestampAndWindow(value, timestamp, windows, pane);
+      }
+
+      @Override
+      public TimerInternals timerInternals() {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public Collection<? extends BoundedWindow> windows() {
+        return windowedValue.getWindows();
+      }
+
+      @Override
+      public PaneInfo pane() {
+        return windowedValue.getPane();
+      }
+
+      @Override
+      public <T> void writePCollectionViewData(TupleTag<?> tag,
+          Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws 
IOException {
+        throw new UnsupportedOperationException();
+      }
+
+      @Override
+      public <ViewT> ViewT sideInput(
+          PCollectionView<ViewT> view,
+          BoundedWindow mainInputWindow) {
+
+        Preconditions.checkNotNull(view, "View passed to sideInput cannot be 
null");
+        Preconditions.checkNotNull(
+            sideInputs.get(view),
+            "Side input for " + view + " not available.");
+
+        // get the side input strategy for mapping the window
+        WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view);
+
+        BoundedWindow sideInputWindow =
+            
windowingStrategy.getWindowFn().getSideInputWindow(mainInputWindow);
+
+        Map<BoundedWindow, ViewT> sideInputs =
+            runtimeContext.getBroadcastVariableWithInitializer(
+                view.getTagInternal().getId(), new 
SideInputInitializer<>(view));
+        return sideInputs.get(sideInputWindow);
+      }
+    };
+  }
+
+  @Override
+  public PipelineOptions getPipelineOptions() {
+    return pipelineOptions;
+  }
+
+  @Override
+  public <ViewT> ViewT sideInput(PCollectionView<ViewT> view) {
+    Preconditions.checkNotNull(view, "View passed to sideInput cannot be 
null");
+    Preconditions.checkNotNull(sideInputs.get(view), "Side input for " + view 
+ " not available.");
+    Iterator<? extends BoundedWindow> windowIter = 
windowedValue.getWindows().iterator();
+    BoundedWindow window;
+    if (!windowIter.hasNext()) {
+      throw new IllegalStateException(
+          "sideInput called when main input element is not in any windows");
+    } else {
+      window = windowIter.next();
+      if (windowIter.hasNext()) {
+        throw new IllegalStateException(
+            "sideInput called when main input element is in multiple windows");
+      }
+    }
+
+    // get the side input strategy for mapping the window
+    WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view);
+
+    BoundedWindow sideInputWindow =
+        windowingStrategy.getWindowFn().getSideInputWindow(window);
+
+    Map<BoundedWindow, ViewT> sideInputs =
+        runtimeContext.getBroadcastVariableWithInitializer(
+            view.getTagInternal().getId(), new SideInputInitializer<>(view));
+    ViewT result = sideInputs.get(sideInputWindow);
+    if (result == null) {
+      result = 
view.fromIterableInternal(Collections.<WindowedValue<?>>emptyList());
+    }
+    return result;
+  }
+
+  @Override
+  public void output(OutputT value) {
+    if (windowedValue != null) {
+      outputWithTimestamp(value, windowedValue.getTimestamp());
+    } else {
+      outputWithTimestamp(value, null);
+    }
+  }
+
+  @Override
+  public void outputWithTimestamp(OutputT value, Instant timestamp) {
+    if (windowedValue == null) {
+      // we are in startBundle() or finishBundle()
+
+      try {
+        Collection windows = windowingStrategy.getWindowFn().assignWindows(
+            new FlinkNoElementAssignContext(
+                windowingStrategy.getWindowFn(),
+                value,
+                timestamp));
+
+        collector.collect(
+            WindowedValue.of(
+                value,
+                timestamp != null ? timestamp : new Instant(Long.MIN_VALUE),
+                windows,
+                PaneInfo.NO_FIRING));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    } else {
+      collector.collect(
+          WindowedValue.of(
+              value,
+              timestamp,
+              windowedValue.getWindows(),
+              windowedValue.getPane()));
+    }
+  }
+
+  protected void outputWithTimestampAndWindow(
+      OutputT value,
+      Instant timestamp,
+      Collection<? extends BoundedWindow> windows,
+      PaneInfo pane) {
+    collector.collect(
+        WindowedValue.of(
+            value, timestamp, windows, pane));
+  }
+
+  @Override
+  public <T> void sideOutput(TupleTag<T> tag, T output) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
+    sideOutput(tag, output);
+  }
+
+  @Override
+  protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+  createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, 
AggOutputT> combiner) {
+    SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper =
+        new SerializableFnAggregatorWrapper<>(combiner);
+    runtimeContext.addAccumulator(name, wrapper);
+    return wrapper;
+  }
+}

Reply via email to