BEAM-261 PCollectionView and side inputs.

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/09754942
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/09754942
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/09754942

Branch: refs/heads/apex-runner
Commit: 09754942c66c9befffc8df9b3c8a75b819a672e6
Parents: 074b18f
Author: Thomas Weise <t...@apache.org>
Authored: Sun Sep 25 16:46:44 2016 -0700
Committer: Thomas Weise <t...@apache.org>
Committed: Sun Oct 16 23:25:55 2016 -0700

----------------------------------------------------------------------
 .../beam/runners/apex/ApexPipelineOptions.java  |   6 +
 .../runners/apex/ApexPipelineTranslator.java    |  19 +-
 .../apache/beam/runners/apex/ApexRunner.java    | 397 ++++++++++++++++++-
 .../FlattenPCollectionTranslator.java           |  26 +-
 .../apex/translators/ParDoBoundTranslator.java  |  22 +-
 .../apex/translators/TranslationContext.java    |  14 +-
 .../functions/ApexFlattenOperator.java          | 113 ++++++
 .../functions/ApexGroupByKeyOperator.java       |  78 +++-
 .../functions/ApexParDoOperator.java            | 210 ++++++++--
 .../io/ApexReadUnboundedInputOperator.java      |  31 +-
 .../apex/translators/utils/ApexStreamTuple.java |  11 +
 .../translators/utils/NoOpSideInputReader.java  |  47 ---
 .../beam/runners/apex/examples/IntTest.java     | 133 +++++++
 .../beam/runners/apex/examples/IntTests.java    | 207 ----------
 .../translators/ParDoBoundTranslatorTest.java   |  37 +-
 .../apex/src/test/resources/log4j.properties    |   4 +-
 16 files changed, 1028 insertions(+), 327 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
index f70d24c..141a8c1 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
@@ -50,6 +50,12 @@ public interface ApexPipelineOptions extends 
PipelineOptions, java.io.Serializab
   @Default.Boolean(true)
   boolean isEmbeddedExecutionDebugMode();
 
+  @Description("output data received and emitted on ports (for debugging)")
+  void setTupleTracingEnabled(boolean enabled);
+
+  @Default.Boolean(false)
+  boolean isTupleTracingEnabled();
+
   @Description("how long the client should wait for the pipeline to run")
   void setRunMillis(long runMillis);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
index b0391b4..ad8c283 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
@@ -18,6 +18,7 @@
 
 package org.apache.beam.runners.apex;
 
+import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView;
 import org.apache.beam.runners.apex.translators.CreateValuesTranslator;
 import org.apache.beam.runners.apex.translators.FlattenPCollectionTranslator;
 import org.apache.beam.runners.apex.translators.GroupByKeyTranslator;
@@ -35,8 +36,8 @@ import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PValue;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,6 +72,7 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
     registerTransformTranslator(Flatten.FlattenPCollectionList.class,
         new FlattenPCollectionTranslator());
     registerTransformTranslator(Create.Values.class, new 
CreateValuesTranslator());
+    registerTransformTranslator(CreateApexPCollectionView.class, new 
CreatePCollectionViewTranslator());
   }
 
   public ApexPipelineTranslator(TranslationContext translationContext) {
@@ -98,7 +100,7 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
     PTransform transform = node.getTransform();
     TransformTranslator translator = 
getTransformTranslator(transform.getClass());
     if (null == translator) {
-      throw new IllegalStateException(
+      throw new UnsupportedOperationException(
           "no translator registered for " + transform);
     }
     translationContext.setCurrentTransform(node);
@@ -147,4 +149,17 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
 
   }
 
+  private static class CreatePCollectionViewTranslator<ElemT, ViewT> 
implements TransformTranslator<CreateApexPCollectionView<ElemT, ViewT>>
+  {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void translate(CreateApexPCollectionView<ElemT, ViewT> transform, 
TranslationContext context)
+    {
+      PCollectionView<ViewT> view = transform.getView();
+      context.addView(view);
+      LOG.debug("view {}", view.getName());
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 5fa3f23..ae79a20 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -19,20 +19,36 @@ package org.apache.beam.runners.apex;
 
 import static com.google.common.base.Preconditions.checkArgument;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.beam.runners.apex.translators.TranslationContext;
 import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.OldDoFn.ProcessContext;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.runners.core.AssignWindows;
+import org.apache.beam.sdk.util.PCollectionViews;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.hadoop.conf.Configuration;
@@ -55,6 +71,13 @@ public class ApexRunner extends 
PipelineRunner<ApexRunnerResult> {
 
   private final ApexPipelineOptions options;
 
+  /**
+   * TODO: this isn't thread sa
+   * Holds any most resent assertion error that was raised while processing 
elements.
+   * Used in the unit test driver in embedded to propagate the exception.
+   */
+  public static volatile AssertionError assertionError;
+
   public ApexRunner(ApexPipelineOptions options) {
     this.options = options;
   }
@@ -77,6 +100,32 @@ public class ApexRunner extends 
PipelineRunner<ApexRunnerResult> {
               input.getPipeline(),
               WindowingStrategy.globalDefault(),
               PCollection.IsBounded.BOUNDED);
+// TODO: replace this with a mapping
+    } else if 
(Combine.GloballyAsSingletonView.class.equals(transform.getClass())) {
+      PTransform<InputT, OutputT> customTransform = (PTransform)new 
StreamingCombineGloballyAsSingletonView<InputT, OutputT>(this,
+          (Combine.GloballyAsSingletonView)transform);
+      return Pipeline.applyTransform(input, customTransform);
+    } else if (View.AsSingleton.class.equals(transform.getClass())) {
+      // note this assumes presence of above Combine.GloballyAsSingletonView 
mapping
+      PTransform<InputT, OutputT> customTransform = (PTransform)new 
StreamingViewAsSingleton<InputT>(this,
+          (View.AsSingleton)transform);
+      return Pipeline.applyTransform(input, customTransform);
+    } else if (View.AsIterable.class.equals(transform.getClass())) {
+      PTransform<InputT, OutputT> customTransform = (PTransform)new 
StreamingViewAsIterable<InputT>(this,
+          (View.AsIterable)transform);
+      return Pipeline.applyTransform(input, customTransform);
+    } else if (View.AsList.class.equals(transform.getClass())) {
+      PTransform<InputT, OutputT> customTransform = (PTransform)new 
StreamingViewAsList<InputT>(this,
+          (View.AsList)transform);
+      return Pipeline.applyTransform(input, customTransform);
+    } else if (View.AsMap.class.equals(transform.getClass())) {
+      PTransform<InputT, OutputT> customTransform = new 
StreamingViewAsMap(this,
+          (View.AsMap)transform);
+      return Pipeline.applyTransform(input, customTransform);
+    } else if (View.AsMultimap.class.equals(transform.getClass())) {
+      PTransform<InputT, OutputT> customTransform = new 
StreamingViewAsMultimap(this,
+          (View.AsMultimap)transform);
+      return Pipeline.applyTransform(input, customTransform);
     } else {
       return super.apply(transform, input);
     }
@@ -109,10 +158,19 @@ public class ApexRunner extends 
PipelineRunner<ApexRunnerResult> {
         // turns off timeout checking for operator progress
         lc.setHeartbeatMonitoringEnabled(false);
       }
+      assertionError = null;
+      lc.runAsync();
       if (options.getRunMillis() > 0) {
-        lc.run(options.getRunMillis());
-      } else {
-        lc.runAsync();
+        try {
+          long timeout = System.currentTimeMillis() + options.getRunMillis();
+          while (System.currentTimeMillis() < timeout) {
+            if (assertionError != null) {
+              throw assertionError;
+            }
+          }
+        } finally {
+          lc.shutdown();
+        }
       }
       return new ApexRunnerResult(lma.getDAG(), lc);
     } catch (Exception e) {
@@ -158,10 +216,343 @@ public class ApexRunner extends 
PipelineRunner<ApexRunnerResult> {
   }
 
   private static class IdentityFn<T> extends DoFn<T, T> {
+    private static final long serialVersionUID = 1L;
     @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(c.element());
     }
   }
 
+////////////////////////////////////////////
+// Adapted from FlinkRunner for View support
+
+  /**
+   * Records that the {@link PTransform} requires a deterministic key coder.
+   */
+  private void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> 
ptransform) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Creates a primitive {@link PCollectionView}.
+   *
+   * <p>For internal use only by runner implementors.
+   *
+   * @param <ElemT> The type of the elements of the input PCollection
+   * @param <ViewT> The type associated with the {@link PCollectionView} used 
as a side input
+   */
+  public static class CreateApexPCollectionView<ElemT, ViewT>
+      extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> {
+    private PCollectionView<ViewT> view;
+
+    private CreateApexPCollectionView(PCollectionView<ViewT> view) {
+      this.view = view;
+    }
+
+    public static <ElemT, ViewT> CreateApexPCollectionView<ElemT, ViewT> of(
+        PCollectionView<ViewT> view) {
+      return new CreateApexPCollectionView<>(view);
+    }
+
+    public PCollectionView<ViewT> getView() {
+      return view;
+    }
+
+    @Override
+    public PCollectionView<ViewT> apply(PCollection<List<ElemT>> input) {
+      return view;
+    }
+  }
+
+  private static class WrapAsList<T> extends OldDoFn<T, List<T>> {
+    @Override
+    public void processElement(ProcessContext c) {
+      c.output(Arrays.asList(c.element()));
+    }
+  }
+
+  private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT>
+      extends PTransform<PCollection<InputT>, PCollectionView<OutputT>>
+  {
+    Combine.GloballyAsSingletonView<InputT, OutputT> transform;
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    public StreamingCombineGloballyAsSingletonView(ApexRunner runner,
+        Combine.GloballyAsSingletonView<InputT, OutputT> transform)
+    {
+      this.transform = transform;
+    }
+
+    @Override
+    public PCollectionView<OutputT> apply(PCollection<InputT> input)
+    {
+      PCollection<OutputT> combined = input
+          
.apply(Combine.globally(transform.getCombineFn()).withoutDefaults().withFanout(transform.getFanout()));
+
+      PCollectionView<OutputT> view = 
PCollectionViews.singletonView(combined.getPipeline(),
+          combined.getWindowingStrategy(), transform.getInsertDefault(),
+          transform.getInsertDefault() ? 
transform.getCombineFn().defaultValue() : null, combined.getCoder());
+      return combined.apply(ParDo.of(new WrapAsList<OutputT>()))
+          .apply(CreateApexPCollectionView.<OutputT, OutputT> of(view));
+    }
+
+    @Override
+    protected String getKindString()
+    {
+      return "StreamingCombineGloballyAsSingletonView";
+    }
+  }
+
+  private static class StreamingViewAsSingleton<T> extends 
PTransform<PCollection<T>, PCollectionView<T>>
+  {
+    private static final long serialVersionUID = 1L;
+    private View.AsSingleton<T> transform;
+
+    public StreamingViewAsSingleton(ApexRunner runner, View.AsSingleton<T> 
transform)
+    {
+      this.transform = transform;
+    }
+
+    @Override
+    public PCollectionView<T> apply(PCollection<T> input)
+    {
+      Combine.Globally<T, T> combine = Combine
+          .globally(new SingletonCombine<>(transform.hasDefaultValue(), 
transform.defaultValue()));
+      if (!transform.hasDefaultValue()) {
+        combine = combine.withoutDefaults();
+      }
+      return input.apply(combine.asSingletonView());
+    }
+
+    @Override
+    protected String getKindString()
+    {
+      return "StreamingViewAsSingleton";
+    }
+
+    private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T>
+    {
+      private boolean hasDefaultValue;
+      private T defaultValue;
+
+      SingletonCombine(boolean hasDefaultValue, T defaultValue)
+      {
+        this.hasDefaultValue = hasDefaultValue;
+        this.defaultValue = defaultValue;
+      }
+
+      @Override
+      public T apply(T left, T right)
+      {
+        throw new IllegalArgumentException("PCollection with more than one 
element "
+            + "accessed as a singleton view. Consider using 
Combine.globally().asSingleton() to "
+            + "combine the PCollection into a single value");
+      }
+
+      @Override
+      public T identity()
+      {
+        if (hasDefaultValue) {
+          return defaultValue;
+        } else {
+          throw new IllegalArgumentException("Empty PCollection accessed as a 
singleton view. "
+              + "Consider setting withDefault to provide a default value");
+        }
+      }
+    }
+  }
+
+  private static class StreamingViewAsMap<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> {
+
+    private final ApexRunner runner;
+
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsMap(ApexRunner runner, View.AsMap<K, V> transform) {
+      this.runner = runner;
+    }
+
+    @Override
+    public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) {
+      PCollectionView<Map<K, V>> view =
+          PCollectionViews.mapView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      try {
+        inputCoder.getKeyCoder().verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+        runner.recordViewUsesNonDeterministicKeyCoder(this);
+      }
+
+      return input
+          .apply(Combine.globally(new Concatenate<KV<K, 
V>>()).withoutDefaults())
+          .apply(CreateApexPCollectionView.<KV<K, V>, Map<K, V>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsMap";
+    }
+  }
+
+  /**
+   * Specialized expansion for {@link
+   * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the
+   * Flink runner in streaming mode.
+   */
+  private static class StreamingViewAsMultimap<K, V>
+      extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, 
Iterable<V>>>> {
+
+    private final ApexRunner runner;
+
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsMultimap(ApexRunner runner, View.AsMultimap<K, V> 
transform) {
+      this.runner = runner;
+    }
+
+    @Override
+    public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> 
input) {
+      PCollectionView<Map<K, Iterable<V>>> view =
+          PCollectionViews.multimapView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      @SuppressWarnings({"rawtypes", "unchecked"})
+      KvCoder<K, V> inputCoder = (KvCoder) input.getCoder();
+      try {
+        inputCoder.getKeyCoder().verifyDeterministic();
+      } catch (Coder.NonDeterministicException e) {
+        runner.recordViewUsesNonDeterministicKeyCoder(this);
+      }
+
+      return input
+          .apply(Combine.globally(new Concatenate<KV<K, 
V>>()).withoutDefaults())
+          .apply(CreateApexPCollectionView.<KV<K, V>, Map<K, 
Iterable<V>>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsMultimap";
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the
+   * Flink runner in streaming mode.
+   */
+  private static class StreamingViewAsList<T>
+      extends PTransform<PCollection<T>, PCollectionView<List<T>>> {
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsList(ApexRunner runner, View.AsList<T> transform) {}
+
+    @Override
+    public PCollectionView<List<T>> apply(PCollection<T> input) {
+      PCollectionView<List<T>> view =
+          PCollectionViews.listView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      return input.apply(Combine.globally(new 
Concatenate<T>()).withoutDefaults())
+          .apply(CreateApexPCollectionView.<T, List<T>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsList";
+    }
+  }
+
+  /**
+   * Specialized implementation for
+   * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} 
for the
+   * Flink runner in streaming mode.
+   */
+  private static class StreamingViewAsIterable<T>
+      extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> {
+    /**
+     * Builds an instance of this class from the overridden transform.
+     */
+    @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply()
+    public StreamingViewAsIterable(ApexRunner runner, View.AsIterable<T> 
transform) { }
+
+    @Override
+    public PCollectionView<Iterable<T>> apply(PCollection<T> input) {
+      PCollectionView<Iterable<T>> view =
+          PCollectionViews.iterableView(
+              input.getPipeline(),
+              input.getWindowingStrategy(),
+              input.getCoder());
+
+      return input.apply(Combine.globally(new 
Concatenate<T>()).withoutDefaults())
+          .apply(CreateApexPCollectionView.<T, Iterable<T>>of(view));
+    }
+
+    @Override
+    protected String getKindString() {
+      return "StreamingViewAsIterable";
+    }
+  }
+
+  /**
+   * Combiner that combines {@code T}s into a single {@code List<T>} 
containing all inputs.
+   *
+   * <p>For internal use by {@link StreamingViewAsMap}, {@link 
StreamingViewAsMultimap},
+   * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}.
+   * They require the input {@link PCollection} fits in memory.
+   * For a large {@link PCollection} this is expected to crash!
+   *
+   * @param <T> the type of elements to concatenate.
+   */
+  private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, 
List<T>> {
+    @Override
+    public List<T> createAccumulator() {
+      return new ArrayList<T>();
+    }
+
+    @Override
+    public List<T> addInput(List<T> accumulator, T input) {
+      accumulator.add(input);
+      return accumulator;
+    }
+
+    @Override
+    public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
+      List<T> result = createAccumulator();
+      for (List<T> accumulator : accumulators) {
+        result.addAll(accumulator);
+      }
+      return result;
+    }
+
+    @Override
+    public List<T> extractOutput(List<T> accumulator) {
+      return accumulator;
+    }
+
+    @Override
+    public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> 
inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+
+    @Override
+    public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, 
Coder<T> inputCoder) {
+      return ListCoder.of(inputCoder);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
index e153867..712466a 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
@@ -18,19 +18,23 @@
 
 package org.apache.beam.runners.apex.translators;
 
+import java.util.Collections;
 import java.util.List;
 
+import org.apache.beam.runners.apex.translators.functions.ApexFlattenOperator;
+import 
org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
+import org.apache.beam.runners.apex.translators.io.ValuesSource;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 
-import com.datatorrent.lib.stream.StreamMerger;
 import com.google.common.collect.Lists;
 
 /**
- * Flatten.FlattenPCollectionList translation to Apex operator.
- * TODO: support more than two streams
+ * {@link Flatten.FlattenPCollectionList} translation to Apex operator.
  */
 public class FlattenPCollectionTranslator<T> implements
     TransformTranslator<Flatten.FlattenPCollectionList<T>> {
@@ -38,16 +42,28 @@ public class FlattenPCollectionTranslator<T> implements
 
   @Override
   public void translate(Flatten.FlattenPCollectionList<T> transform, 
TranslationContext context) {
-    PCollection<T> firstCollection = null;
     PCollectionList<T> input = context.getInput();
     List<PCollection<T>> collections = input.getAll();
+
+    if (collections.isEmpty()) {
+      // create a dummy source that never emits anything
+      @SuppressWarnings("unchecked")
+      UnboundedSource<T, ?> unboundedSource = new 
ValuesSource<>(Collections.EMPTY_LIST,
+          (Coder<T>) VoidCoder.of());
+      ApexReadUnboundedInputOperator<T, ?> operator = new 
ApexReadUnboundedInputOperator<>(
+          unboundedSource, context.getPipelineOptions());
+      context.addOperator(operator, operator.output);
+      return;
+    }
+
     List<PCollection<T>> remainingCollections = Lists.newArrayList();
+    PCollection<T> firstCollection = null;
     while (!collections.isEmpty()) {
       for (PCollection<T> collection : collections) {
         if (null == firstCollection) {
           firstCollection = collection;
         } else {
-          StreamMerger<T> operator = new StreamMerger<>();
+          ApexFlattenOperator<T> operator = new ApexFlattenOperator<>();
           context.addStream(firstCollection, operator.data1);
           context.addStream(collection, operator.data2);
           if (collections.size() > 2) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
index a958234..632829a 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java
@@ -18,11 +18,15 @@
 
 package org.apache.beam.runners.apex.translators;
 
+import java.util.List;
+
 import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
-import org.apache.beam.runners.apex.translators.utils.NoOpSideInputReader;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
+
+import com.datatorrent.api.Operator;
 
 /**
  * {@link ParDo.Bound} is translated to Apex operator that wraps the {@link 
DoFn}
@@ -35,9 +39,23 @@ public class ParDoBoundTranslator<InputT, OutputT> implements
   public void translate(ParDo.Bound<InputT, OutputT> transform, 
TranslationContext context) {
     OldDoFn<InputT, OutputT> doFn = transform.getFn();
     PCollection<OutputT> output = context.getOutput();
+    List<PCollectionView<?>> sideInputs = transform.getSideInputs();
     ApexParDoOperator<InputT, OutputT> operator = new 
ApexParDoOperator<>(context.getPipelineOptions(),
-        doFn, output.getWindowingStrategy(), new NoOpSideInputReader());
+        doFn, output.getWindowingStrategy(), sideInputs);
     context.addOperator(operator, operator.output);
     context.addStream(context.getInput(), operator.input);
+    if (!sideInputs.isEmpty()) {
+      Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1};
+      for (int i=0; i<sideInputs.size(); i++) {
+        // the number of input ports for side inputs are fixed and each port 
can only take one input.
+        // more (optional) ports can be added to give reasonable capacity or 
an explicit union operation introduced.
+        if (i == sideInputPorts.length) {
+          String msg = String.format("Too many side inputs in %s (currently 
only supporting %s).",
+              transform.toString(), sideInputPorts.length);
+          throw new UnsupportedOperationException(msg);
+        }
+        context.addStream(context.getViewInput(sideInputs.get(i)), 
sideInputPorts[i]);
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
index ab7cd0a..163cfd4 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
 import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -54,6 +55,17 @@ public class TranslationContext {
   private AppliedPTransform<?, ?, ?> currentTransform;
   private final Map<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> 
streams = new HashMap<>();
   private final Map<String, Operator> operators = new HashMap<>();
+  private final Map<PCollectionView<?>, PInput> viewInputs = new HashMap<>();
+
+  public void addView(PCollectionView<?> view) {
+    this.viewInputs.put(view, this.getInput());
+  }
+
+  public <InputT extends PInput> InputT getViewInput(PCollectionView<?> view) {
+    PInput input = this.viewInputs.get(view);
+    checkArgument(input != null, "unknown view " + view.getName());
+    return (InputT)input;
+  }
 
   public TranslationContext(ApexPipelineOptions pipelineOptions) {
     this.pipelineOptions = pipelineOptions;
@@ -102,7 +114,7 @@ public class TranslationContext {
 
   public void addStream(PInput input, InputPort inputPort) {
     Pair<OutputPort<?>, List<InputPort<?>>> stream = this.streams.get(input);
-    checkArgument(stream != null, "no upstream operator defined");
+    checkArgument(stream != null, "no upstream operator defined for %s", 
input);
     stream.getRight().add(inputPort);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
new file mode 100644
index 0000000..ce27abb
--- /dev/null
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.apex.translators.functions;
+
+import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
+import 
org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.WatermarkTuple;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Apex operator for Beam {@link Flatten.FlattenPCollectionList}.
+ */
+public class ApexFlattenOperator<InputT> extends BaseOperator
+{
+  private static final Logger LOG = 
LoggerFactory.getLogger(ApexFlattenOperator.class);
+  private boolean traceTuples = true;
+
+  private long inputWM1;
+  private long inputWM2;
+  private long outputWM;
+
+  /**
+   * Data input port 1.
+   */
+  public final transient 
DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data1 = new 
DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>()
+  {
+    /**
+     * Emits to port "out"
+     */
+    @Override
+    public void process(ApexStreamTuple<WindowedValue<InputT>> tuple)
+    {
+      if (tuple instanceof WatermarkTuple) {
+        WatermarkTuple<?> wmTuple = (WatermarkTuple<?>)tuple;
+        if (wmTuple.getTimestamp() > inputWM1) {
+          inputWM1 = wmTuple.getTimestamp();
+          if (inputWM1 <= inputWM2) {
+            // move output watermark and emit it
+            outputWM = inputWM1;
+            if (traceTuples) {
+              LOG.debug("\nemitting watermark {}\n", outputWM);
+            }
+            out.emit(tuple);
+          }
+        }
+        return;
+      }
+      if (traceTuples) {
+        LOG.debug("\nemitting {}\n", tuple);
+      }
+      out.emit(tuple);
+    }
+  };
+
+  /**
+   * Data input port 2.
+   */
+  public final transient 
DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data2 = new 
DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>()
+  {
+    /**
+     * Emits to port "out"
+     */
+    @Override
+    public void process(ApexStreamTuple<WindowedValue<InputT>> tuple)
+    {
+      if (tuple instanceof WatermarkTuple) {
+        WatermarkTuple<?> wmTuple = (WatermarkTuple<?>)tuple;
+        if (wmTuple.getTimestamp() > inputWM2) {
+          inputWM2 = wmTuple.getTimestamp();
+          if (inputWM2 <= inputWM1) {
+            // move output watermark and emit it
+            outputWM = inputWM2;
+            if (traceTuples) {
+              LOG.debug("\nemitting watermark {}\n", outputWM);
+            }
+            out.emit(tuple);
+          }
+        }
+        return;
+      }
+      if (traceTuples) {
+        LOG.debug("\nemitting {}\n", tuple);
+      }
+      out.emit(tuple);
+    }
+  };
+
+  /**
+   * Output port.
+   */
+  public final transient 
DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out = new 
DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>>();
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
index 29e1b32..5970f36 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.apex.translators.functions;
 
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -33,6 +34,7 @@ import 
org.apache.beam.runners.apex.translators.utils.SerializablePipelineOption
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Aggregator;
@@ -41,6 +43,7 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.KeyedWorkItems;
 import org.apache.beam.sdk.util.TimerInternals;
@@ -55,6 +58,8 @@ import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
@@ -79,16 +84,22 @@ import com.google.common.collect.Multimap;
  */
 public class ApexGroupByKeyOperator<K, V> implements Operator
 {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ApexGroupByKeyOperator.class);
+  private boolean traceTuples = true;
+
   @Bind(JavaSerializer.class)
   private WindowingStrategy<V, BoundedWindow> windowingStrategy;
   @Bind(JavaSerializer.class)
+  private Coder<K> keyCoder;
+  @Bind(JavaSerializer.class)
   private Coder<V> valueCoder;
 
   @Bind(JavaSerializer.class)
   private final SerializablePipelineOptions serializedOptions;
   @Bind(JavaSerializer.class)
-  private Map<K, StateInternals<K>> perKeyStateInternals = new HashMap<>();
-  private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>();
+// TODO: InMemoryStateInternals not serializable
+transient  private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = 
new HashMap<>();
+  private Map<ByteBuffer, Set<TimerInternals.TimerData>> activeTimers = new 
HashMap<>();
 
   private transient ProcessContext context;
   private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn;
@@ -100,14 +111,19 @@ public class ApexGroupByKeyOperator<K, V> implements 
Operator
     @Override
     public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> t)
     {
-      //System.out.println("\n***RECEIVED: " +t);
       try {
         if (t instanceof ApexStreamTuple.WatermarkTuple) {
           ApexStreamTuple.WatermarkTuple<?> mark = 
(ApexStreamTuple.WatermarkTuple<?>)t;
           processWatermark(mark);
+          if (traceTuples) {
+            LOG.debug("\nemitting watermark {}\n", mark.getTimestamp());
+          }
           output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<KV<K, 
Iterable<V>>>>of(mark.getTimestamp()));
           return;
         }
+        if (traceTuples) {
+          LOG.debug("\ninput {}\n", t.getValue());
+        }
         processElement(t.getValue());
       } catch (Exception e) {
         Throwables.propagate(e);
@@ -124,6 +140,7 @@ public class ApexGroupByKeyOperator<K, V> implements 
Operator
     Preconditions.checkNotNull(pipelineOptions);
     this.serializedOptions = new SerializablePipelineOptions(pipelineOptions);
     this.windowingStrategy = (WindowingStrategy<V, 
BoundedWindow>)input.getWindowingStrategy();
+    this.keyCoder = ((KvCoder<K, V>)input.getCoder()).getKeyCoder();
     this.valueCoder = ((KvCoder<K, V>)input.getCoder()).getValueCoder();
   }
 
@@ -146,6 +163,7 @@ public class ApexGroupByKeyOperator<K, V> implements 
Operator
   @Override
   public void setup(OperatorContext context)
   {
+    this.traceTuples = 
ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this);
     StateInternalsFactory<K> stateInternalsFactory = new 
GroupByKeyStateInternalsFactory();
     this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy, 
stateInternalsFactory,
         SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder));
@@ -163,16 +181,16 @@ public class ApexGroupByKeyOperator<K, V> implements 
Operator
    * We keep these timers in a Set, so that they are deduplicated, as the same
    * timer can be registered multiple times.
    */
-  private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long 
currentWatermark) {
+  private Multimap<ByteBuffer, TimerInternals.TimerData> 
getTimersReadyToProcess(long currentWatermark) {
 
     // we keep the timers to return in a different list and launch them later
     // because we cannot prevent a trigger from registering another trigger,
     // which would lead to concurrent modification exception.
-    Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create();
+    Multimap<ByteBuffer, TimerInternals.TimerData> toFire = 
HashMultimap.create();
 
-    Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = 
activeTimers.entrySet().iterator();
+    Iterator<Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>>> it = 
activeTimers.entrySet().iterator();
     while (it.hasNext()) {
-      Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next();
+      Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>> keyWithTimers = 
it.next();
 
       Iterator<TimerInternals.TimerData> timerIt = 
keyWithTimers.getValue().iterator();
       while (timerIt.hasNext()) {
@@ -205,44 +223,64 @@ public class ApexGroupByKeyOperator<K, V> implements 
Operator
     fn.processElement(context);
   }
 
-  private StateInternals<K> getStateInternalsForKey(K key) {
-    StateInternals<K> stateInternals = perKeyStateInternals.get(key);
+  private StateInternals<K> getStateInternalsForKey(K key)
+  {
+    final ByteBuffer keyBytes;
+    try {
+      keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
+    } catch (CoderException e) {
+      throw Throwables.propagate(e);
+    }
+    StateInternals<K> stateInternals = perKeyStateInternals.get(keyBytes);
     if (stateInternals == null) {
       //Coder<? extends BoundedWindow> windowCoder = 
this.windowingStrategy.getWindowFn().windowCoder();
       //OutputTimeFn<? super BoundedWindow> outputTimeFn = 
this.windowingStrategy.getOutputTimeFn();
       stateInternals = InMemoryStateInternals.forKey(key);
-      perKeyStateInternals.put(key, stateInternals);
+      perKeyStateInternals.put(keyBytes, stateInternals);
     }
     return stateInternals;
   }
 
   private void registerActiveTimer(K key, TimerInternals.TimerData timer) {
-    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
+    final ByteBuffer keyBytes;
+    try {
+      keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
+    } catch (CoderException e) {
+      throw Throwables.propagate(e);
+    }
+    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
     if (timersForKey == null) {
       timersForKey = new HashSet<>();
     }
     timersForKey.add(timer);
-    activeTimers.put(key, timersForKey);
+    activeTimers.put(keyBytes, timersForKey);
   }
 
   private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) {
-    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key);
+    final ByteBuffer keyBytes;
+    try {
+      keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key));
+    } catch (CoderException e) {
+      throw Throwables.propagate(e);
+    }
+    Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes);
     if (timersForKey != null) {
       timersForKey.remove(timer);
       if (timersForKey.isEmpty()) {
-        activeTimers.remove(key);
+        activeTimers.remove(keyBytes);
       } else {
-        activeTimers.put(key, timersForKey);
+        activeTimers.put(keyBytes, timersForKey);
       }
     }
   }
 
   private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws 
Exception {
     this.inputWatermark = new Instant(mark.getTimestamp());
-    Multimap<K, TimerInternals.TimerData> timers = 
getTimersReadyToProcess(mark.getTimestamp());
+    Multimap<ByteBuffer, TimerInternals.TimerData> timers = 
getTimersReadyToProcess(mark.getTimestamp());
     if (!timers.isEmpty()) {
-      for (K key : timers.keySet()) {
-        KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, 
timers.get(key));
+      for (ByteBuffer keyBytes : timers.keySet()) {
+        K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array());
+        KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, 
timers.get(keyBytes));
         context.setElement(kwi, getStateInternalsForKey(kwi.key()));
         fn.processElement(context);
       }
@@ -315,7 +353,9 @@ public class ApexGroupByKeyOperator<K, V> implements 
Operator
 
         @Override
         public void outputWindowedValue(KV<K, Iterable<V>> output, Instant 
timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-          System.out.println("\n***EMITTING: " + output + ", timestamp=" + 
timestamp);
+          if (traceTuples) {
+            LOG.debug("\nemitting {} timestamp {}\n", output, timestamp);
+          }
           
ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of(WindowedValue.of(output,
 timestamp, windows, pane)));
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
index d358d14..13a8fc9 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
@@ -18,40 +18,58 @@
 
 package org.apache.beam.runners.apex.translators.functions;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translators.utils.NoOpStepContext;
 import 
org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.PushbackSideInputDoFnRunner;
+import org.apache.beam.runners.core.SideInputHandler;
 import org.apache.beam.runners.core.DoFnRunners.OutputManager;
+import org.apache.beam.sdk.repackaged.com.google.common.base.Throwables;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.util.ExecutionContext;
+import org.apache.beam.sdk.util.NullSideInputReader;
 import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.InMemoryStateInternals;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.annotation.InputPortFieldAnnotation;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.common.util.BaseOperator;
 import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.google.common.collect.Iterables;
 import com.esotericsoftware.kryo.serializers.JavaSerializer;
 
 /**
  * Apex operator for Beam {@link DoFn}.
  */
 public class ApexParDoOperator<InputT, OutputT> extends BaseOperator 
implements OutputManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(ApexParDoOperator.class);
+  private boolean traceTuples = true;
 
   private transient final TupleTag<OutputT> mainTag = new TupleTag<OutputT>();
-  private transient DoFnRunner<InputT, OutputT> doFnRunner;
+  private transient PushbackSideInputDoFnRunner<InputT, OutputT> 
pushbackDoFnRunner;
 
   @Bind(JavaSerializer.class)
   private final SerializablePipelineOptions pipelineOptions;
@@ -60,17 +78,28 @@ public class ApexParDoOperator<InputT, OutputT> extends 
BaseOperator implements
   @Bind(JavaSerializer.class)
   private final WindowingStrategy<?, ?> windowingStrategy;
   @Bind(JavaSerializer.class)
-  private final SideInputReader sideInputReader;
+  List<PCollectionView<?>> sideInputs;
+// TODO: not Kryo serializable, integrate codec
+//@Bind(JavaSerializer.class)
+private transient StateInternals<Void> sideInputStateInternals = 
InMemoryStateInternals.forKey(null);
+  private transient SideInputHandler sideInputHandler;
+  // TODO: not Kryo serializable, integrate codec
+  private List<WindowedValue<InputT>> pushedBack = new ArrayList<>();
+  private LongMin pushedBackWatermark = new LongMin();
+  private long currentInputWatermark = Long.MIN_VALUE;
+  private long currentOutputWatermark = currentInputWatermark;
 
   public ApexParDoOperator(
       ApexPipelineOptions pipelineOptions,
       OldDoFn<InputT, OutputT> doFn,
       WindowingStrategy<?, ?> windowingStrategy,
-      SideInputReader sideInputReader) {
+      List<PCollectionView<?>> sideInputs
+      )
+  {
     this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions);
     this.doFn = doFn;
     this.windowingStrategy = windowingStrategy;
-    this.sideInputReader = sideInputReader;
+    this.sideInputs = sideInputs;
   }
 
   @SuppressWarnings("unused") // for Kryo
@@ -78,17 +107,60 @@ public class ApexParDoOperator<InputT, OutputT> extends 
BaseOperator implements
     this(null, null, null, null);
   }
 
+
   public final transient 
DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input = new 
DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>()
   {
     @Override
     public void process(ApexStreamTuple<WindowedValue<InputT>> t)
     {
       if (t instanceof ApexStreamTuple.WatermarkTuple) {
-        output.emit(t);
+        processWatermark((ApexStreamTuple.WatermarkTuple<?>)t);
       } else {
-        System.out.println("\n" + Thread.currentThread().getName() + "\n" + 
t.getValue() + "\n");
-        doFnRunner.processElement(t.getValue());
+        if (traceTuples) {
+          LOG.debug("\ninput {}\n", t.getValue());
+        }
+        Iterable<WindowedValue<InputT>> justPushedBack = 
processElementInReadyWindows(t.getValue());
+        for (WindowedValue<InputT> pushedBackValue : justPushedBack) {
+          pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
+          pushedBack.add(pushedBackValue);
+        }
+      }
+    }
+  };
+
+  @InputPortFieldAnnotation(optional=true)
+  public final transient 
DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>> sideInput1 = new 
DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>>()
+  {
+    private final int sideInputIndex = 0;
+
+    @Override
+    public void process(ApexStreamTuple<WindowedValue<Iterable<?>>> t)
+    {
+      if (t instanceof ApexStreamTuple.WatermarkTuple) {
+        // ignore side input watermarks
+        return;
+      }
+      if (traceTuples) {
+        LOG.debug("\nsideInput {}\n", t.getValue());
       }
+      PCollectionView<?> sideInput = sideInputs.get(sideInputIndex);
+      sideInputHandler.addSideInputValue(sideInput, t.getValue());
+
+      List<WindowedValue<InputT>> newPushedBack = new ArrayList<>();
+      for (WindowedValue<InputT> elem : pushedBack) {
+        Iterable<WindowedValue<InputT>> justPushedBack = 
processElementInReadyWindows(elem);
+        Iterables.addAll(newPushedBack, justPushedBack);
+      }
+
+      pushedBack.clear();
+      pushedBackWatermark.clear();
+      for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
+        pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis());
+        pushedBack.add(pushedBackValue);
+      }
+
+      // potentially emit watermark
+      
processWatermark(ApexStreamTuple.WatermarkTuple.of(currentInputWatermark));
     }
   };
 
@@ -99,27 +171,82 @@ public class ApexParDoOperator<InputT, OutputT> extends 
BaseOperator implements
   public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple)
   {
     output.emit(ApexStreamTuple.DataTuple.of(tuple));
+    if (traceTuples) {
+      LOG.debug("\nemitting {}\n", tuple);
+    }
+  }
+
+  private Iterable<WindowedValue<InputT>> 
processElementInReadyWindows(WindowedValue<InputT> elem) {
+    try {
+      return pushbackDoFnRunner.processElementInReadyWindows(elem);
+    } catch (UserCodeException ue) {
+      if (ue.getCause() instanceof AssertionError) {
+        ApexRunner.assertionError = (AssertionError)ue.getCause();
+      }
+      throw ue;
+    }
+  }
+
+  private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark)
+  {
+    this.currentInputWatermark = mark.getTimestamp();
+
+    if (sideInputs.isEmpty()) {
+      if (traceTuples) {
+        LOG.debug("\nemitting watermark {}\n", mark);
+      }
+      output.emit(mark);
+      return;
+    }
+
+    long potentialOutputWatermark =
+        Math.min(pushedBackWatermark.get(), currentInputWatermark);
+    if (potentialOutputWatermark > currentOutputWatermark) {
+      currentOutputWatermark = potentialOutputWatermark;
+      if (traceTuples) {
+        LOG.debug("\nemitting watermark {}\n", currentOutputWatermark);
+      }
+      output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
+    }
   }
 
   @Override
   public void setup(OperatorContext context)
   {
-    this.doFnRunner = DoFnRunners.simpleRunner(pipelineOptions.get(),
+    this.traceTuples = 
ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
+    SideInputReader sideInputReader = NullSideInputReader.of(sideInputs);
+    if (!sideInputs.isEmpty()) {
+      sideInputHandler = new SideInputHandler(sideInputs, 
sideInputStateInternals);
+      sideInputReader = sideInputHandler;
+    }
+
+    DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.createDefault(
+        pipelineOptions.get(),
         doFn,
         sideInputReader,
         this,
         mainTag,
-        TupleTagList.empty().getAll(),
+        TupleTagList.empty().getAll() /*sideOutputTags*/,
         new NoOpStepContext(),
         new NoOpAggregatorFactory(),
         windowingStrategy
         );
+
+    pushbackDoFnRunner =
+        PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, 
sideInputHandler);
+
+    try {
+      doFn.setup();
+    } catch (Exception e) {
+      Throwables.propagate(e);
+    }
+
   }
 
   @Override
   public void beginWindow(long windowId)
   {
-    doFnRunner.startBundle();
+    pushbackDoFnRunner.startBundle();
     /*
     Collection<Aggregator<?, ?>> aggregators = 
AggregatorRetriever.getAggregators(doFn);
     if (!aggregators.isEmpty()) {
@@ -131,14 +258,14 @@ public class ApexParDoOperator<InputT, OutputT> extends 
BaseOperator implements
   @Override
   public void endWindow()
   {
-    doFnRunner.finishBundle();
+    pushbackDoFnRunner.finishBundle();
   }
 
   /**
    * TODO: Placeholder for aggregation, to be implemented for embedded and 
cluster mode.
    * It is called from {@link org.apache.beam.sdk.util.SimpleDoFnRunner}.
    */
-  public class NoOpAggregatorFactory implements AggregatorFactory {
+  public static class NoOpAggregatorFactory implements AggregatorFactory {
 
     private NoOpAggregatorFactory() {
     }
@@ -147,31 +274,52 @@ public class ApexParDoOperator<InputT, OutputT> extends 
BaseOperator implements
     public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> 
createAggregatorForDoFn(
         Class<?> fnClass, ExecutionContext.StepContext step,
         String name, CombineFn<InputT, AccumT, OutputT> combine) {
-      return new Aggregator<InputT, OutputT>() {
+      return new NoOpAggregator<InputT, OutputT>();
+    }
 
-        @Override
-        public void addValue(InputT value)
-        {
-        }
+    private static class NoOpAggregator<InputT, OutputT> implements 
Aggregator<InputT, OutputT>, java.io.Serializable
+    {
+      private static final long serialVersionUID = 1L;
 
-        @Override
-        public String getName()
-        {
-          // TODO Auto-generated method stub
-          return null;
-        }
+      @Override
+      public void addValue(InputT value)
+      {
+      }
+
+      @Override
+      public String getName()
+      {
+        // TODO Auto-generated method stub
+        return null;
+      }
+
+      @Override
+      public CombineFn<InputT, ?, OutputT> getCombineFn()
+      {
+        // TODO Auto-generated method stub
+        return null;
+      }
+
+    };
 
-        @Override
-        public CombineFn<InputT, ?, OutputT> getCombineFn()
-        {
-          // TODO Auto-generated method stub
-          return null;
-        }
 
-      };
-    }
   }
 
+  private static class LongMin {
+    long state = Long.MAX_VALUE;
 
+    public void add(long l) {
+      state = Math.min(state, l);
+    }
+
+    public long get() {
+      return state;
+    }
+
+    public void clear() {
+      state = Long.MAX_VALUE;
+    }
+
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
index 39114fe..6ee82ea 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
@@ -28,8 +28,11 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValue;
 
 import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.google.common.base.Throwables;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
@@ -40,10 +43,14 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import java.io.IOException;
 
 /**
- * Apex input operator that wraps Beam UnboundedSource.
+ * Apex input operator that wraps Beam {@link UnboundedSource}.
  */
 public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends 
UnboundedSource.CheckpointMark>
     implements InputOperator {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      ApexReadUnboundedInputOperator.class);
+  private boolean traceTuples = false;
+  private long outputWatermark = 0;
 
   @Bind(JavaSerializer.class)
   private final SerializablePipelineOptions pipelineOptions;
@@ -51,6 +58,7 @@ public class ApexReadUnboundedInputOperator<OutputT, 
CheckpointMarkT extends Unb
   private final UnboundedSource<OutputT, CheckpointMarkT> source;
   private transient UnboundedSource.UnboundedReader<OutputT> reader;
   private transient boolean available = false;
+  @OutputPortFieldAnnotation(optional=true)
   public final transient 
DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output = new 
DefaultOutputPort<>();
 
   public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, 
CheckpointMarkT> source, ApexPipelineOptions options) {
@@ -66,12 +74,23 @@ public class ApexReadUnboundedInputOperator<OutputT, 
CheckpointMarkT extends Unb
   @Override
   public void beginWindow(long windowId)
   {
-    Instant mark = reader.getWatermark();
-    
output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<OutputT>>of(mark.getMillis()));
     if (!available && source instanceof ValuesSource) {
-      // if it's a Create transformation and the input was consumed,
+      // if it's a Create and the input was consumed, emit final watermark
+      emitWatermarkIfNecessary(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis());
       // terminate the stream (allows tests to finish faster)
       BaseOperator.shutdown();
+    } else {
+      emitWatermarkIfNecessary(reader.getWatermark().getMillis());
+    }
+  }
+
+  private void emitWatermarkIfNecessary(long mark) {
+    if (mark > outputWatermark) {
+      outputWatermark = mark;
+      if (traceTuples) {
+        LOG.debug("\nemitting watermark {}\n", mark);
+      }
+      
output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<OutputT>>of(mark));
     }
   }
 
@@ -83,6 +102,7 @@ public class ApexReadUnboundedInputOperator<OutputT, 
CheckpointMarkT extends Unb
   @Override
   public void setup(OperatorContext context)
   {
+    this.traceTuples = 
ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this);
     try {
       reader = source.createReader(this.pipelineOptions.get(), null);
       available = reader.start();
@@ -114,6 +134,9 @@ public class ApexReadUnboundedInputOperator<OutputT, 
CheckpointMarkT extends Unb
         OutputT data = reader.getCurrent();
         Instant timestamp = reader.getCurrentTimestamp();
         available = reader.advance();
+        if (traceTuples) {
+          LOG.debug("\nemitting {}\n", data);
+        }
         output.emit(DataTuple.of(WindowedValue.of(
             data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)));
       }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
index efb69ee..06940aa 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java
@@ -27,10 +27,13 @@ import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.List;
 
+import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.StandardCoder;
 
+import com.datatorrent.api.Operator;
+
 public interface ApexStreamTuple<T>
 {
   /**
@@ -188,4 +191,12 @@ public interface ApexStreamTuple<T>
 
   }
 
+  final class Logging
+  {
+    public static boolean isDebugEnabled(ApexPipelineOptions options, Operator 
operator)
+    {
+      return options.isTupleTracingEnabled();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java
deleted file mode 100644
index ffe1a29..0000000
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.apex.translators.utils;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.values.PCollectionView;
-
-import java.io.Serializable;
-
-import javax.annotation.Nullable;
-
-/**
- * no-op side input reader.
- */
-public class NoOpSideInputReader implements SideInputReader, Serializable {
-  @Nullable
-  @Override
-  public <T> T get(PCollectionView<T> view, BoundedWindow window) {
-    return null;
-  }
-
-  @Override
-  public <T> boolean contains(PCollectionView<T> view) {
-    return false;
-  }
-
-  @Override
-  public boolean isEmpty() {
-    return false;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java
new file mode 100644
index 0000000..3573d31
--- /dev/null
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.apex.examples;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.TestApexRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.Read;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.runners.dataflow.TestCountingSource;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * For debugging only.
+ */
+@Ignore
+@RunWith(JUnit4.class)
+public class IntTest implements java.io.Serializable
+{
+
+  @Test
+  public void test()
+  {
+    ApexPipelineOptions options = 
PipelineOptionsFactory.as(ApexPipelineOptions.class);
+    options.setTupleTracingEnabled(true);
+    options.setRunner(TestApexRunner.class);
+    Pipeline p = Pipeline.create(options);
+boolean timeBound = false;
+
+
+  TestCountingSource source = new 
TestCountingSource(Integer.MAX_VALUE).withoutSplitting();
+//List<KV<Integer,Integer>> values = Lists.newArrayList(
+//    KV.of(0, 99),KV.of(0, 99),KV.of(0, 98));
+
+//UnboundedSource<KV<Integer,Integer>, ?> source = new ValuesSource<>(values,
+//   KvCoder.of(VarIntCoder.of(), VarIntCoder.of()));
+
+  if (true) {
+      source = source.withDedup();
+    }
+
+    PCollection<KV<Integer, Integer>> output =
+        timeBound
+        ? p.apply(Read.from(source).withMaxReadTime(Duration.millis(200)))
+         : p.apply(Read.from(source).withMaxNumRecords(NUM_RECORDS));
+
+    List<KV<Integer, Integer>> expectedOutput = new ArrayList<>();
+    for (int i = 0; i < NUM_RECORDS; i++) {
+      expectedOutput.add(KV.of(0, i));
+    }
+
+    // Because some of the NUM_RECORDS elements read are dupes, the final 
output
+    // will only have output from 0 to n where n < NUM_RECORDS.
+    PAssert.that(output).satisfies(new Checker(true, timeBound));
+
+
+    p.run();
+    return;
+  }
+
+  private static final int NUM_RECORDS = 10;
+  private static class Checker implements 
SerializableFunction<Iterable<KV<Integer, Integer>>, Void>
+  {
+    private final boolean dedup;
+    private final boolean timeBound;
+
+    Checker(boolean dedup, boolean timeBound)
+    {
+      this.dedup = dedup;
+      this.timeBound = timeBound;
+    }
+
+    @Override
+    public Void apply(Iterable<KV<Integer, Integer>> input)
+    {
+      List<Integer> values = new ArrayList<>();
+      for (KV<Integer, Integer> kv : input) {
+        assertEquals(0, (int)kv.getKey());
+        values.add(kv.getValue());
+      }
+      if (timeBound) {
+        assertTrue(values.size() >= 1);
+      } else if (dedup) {
+        // Verify that at least some data came through.  The chance of 90% of 
the input
+        // being duplicates is essentially zero.
+        assertTrue(values.size() > NUM_RECORDS / 10 && values.size() <= 
NUM_RECORDS);
+      } else {
+        assertEquals(NUM_RECORDS, values.size());
+      }
+      Collections.sort(values);
+      for (int i = 0; i < values.size(); i++) {
+        assertEquals(i, (int)values.get(i));
+      }
+      //if (finalizeTracker != null) {
+      //  assertThat(finalizeTracker, containsInAnyOrder(values.size() - 1));
+      //}
+      return null;
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java
deleted file mode 100644
index 0ee3442..0000000
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java
+++ /dev/null
@@ -1,207 +0,0 @@
-  /*
-   * Licensed to the Apache Software Foundation (ASF) under one
-   * or more contributor license agreements.  See the NOTICE file
-   * distributed with this work for additional information
-   * regarding copyright ownership.  The ASF licenses this file
-   * to you under the Apache License, Version 2.0 (the
-   * "License"); you may not use this file except in compliance
-   * with the License.  You may obtain a copy of the License at
-   *
-   *     http://www.apache.org/licenses/LICENSE-2.0
-   *
-   * Unless required by applicable law or agreed to in writing, software
-   * distributed under the License is distributed on an "AS IS" BASIS,
-   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   * See the License for the specific language governing permissions and
-   * limitations under the License.
-   */
-
-package org.apache.beam.runners.apex.examples;
-
-
-  import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-  import static org.hamcrest.Matchers.is;
-  import static org.junit.Assert.assertThat;
-
-import org.apache.beam.runners.apex.ApexPipelineOptions;
-import org.apache.beam.runners.apex.TestApexRunner;
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.CountingInput;
-import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.NeedsRunner;
-  import org.apache.beam.sdk.testing.PAssert;
-  import org.apache.beam.sdk.testing.RunnableOnService;
-  import org.apache.beam.sdk.testing.TestPipeline;
-  import org.apache.beam.sdk.transforms.Count;
-  import org.apache.beam.sdk.transforms.DoFn;
-  import org.apache.beam.sdk.transforms.Max;
-  import org.apache.beam.sdk.transforms.Min;
-  import org.apache.beam.sdk.transforms.PTransform;
-  import org.apache.beam.sdk.transforms.ParDo;
-  import org.apache.beam.sdk.transforms.RemoveDuplicates;
-  import org.apache.beam.sdk.transforms.SerializableFunction;
-  import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.windowing.FixedWindows;
-import org.apache.beam.sdk.transforms.windowing.Window;
-import org.apache.beam.sdk.values.PCollection;
-  import org.joda.time.Duration;
-  import org.joda.time.Instant;
-  import org.junit.Test;
-  import org.junit.experimental.categories.Category;
-  import org.junit.runner.RunWith;
-  import org.junit.runners.JUnit4;
-
-  /**
-   * Tests for {@link CountingInput}.
-   */
-  @RunWith(JUnit4.class)
-  public class IntTests {
-    public static void addCountingAsserts(PCollection<Long> input, long 
numElements) {
-      // Count == numElements
-      PAssert.thatSingleton(input.apply("Count", Count.<Long>globally()))
-          .isEqualTo(numElements);
-      // Unique count == numElements
-      PAssert.thatSingleton(
-              input
-                  .apply(RemoveDuplicates.<Long>create())
-                  .apply("UniqueCount", Count.<Long>globally()))
-          .isEqualTo(numElements);
-      // Min == 0
-      PAssert.thatSingleton(input.apply("Min", 
Min.<Long>globally())).isEqualTo(0L);
-      // Max == numElements-1
-      PAssert.thatSingleton(input.apply("Max", Max.<Long>globally()))
-          .isEqualTo(numElements - 1);
-    }
-
-    @Test
-    @Category(RunnableOnService.class)
-    public void testBoundedInput() {
-      //Pipeline p = TestPipeline.create();
-      ApexPipelineOptions options = 
PipelineOptionsFactory.as(ApexPipelineOptions.class);
-      options.setRunner(TestApexRunner.class);
-      Pipeline p = Pipeline.create(options);
-
-      long numElements = 1000;
-      PCollection<Long> input = p.apply(CountingInput.upTo(numElements));
-
-      addCountingAsserts(input, numElements);
-      p.run();
-    }
-
-    @Test
-    public void testBoundedDisplayData() {
-      PTransform<?, ?> input = CountingInput.upTo(1234);
-      DisplayData displayData = DisplayData.from(input);
-      assertThat(displayData, hasDisplayItem("upTo", 1234));
-    }
-
-    @Test
-    @Category(RunnableOnService.class)
-    public void testUnboundedInput() {
-      //Pipeline p = TestPipeline.create();
-      ApexPipelineOptions options = 
PipelineOptionsFactory.as(ApexPipelineOptions.class);
-      options.setRunner(TestApexRunner.class);
-      Pipeline p = Pipeline.create(options);
-
-
-      long numElements = 1000;
-
-      PCollection<Long> input = 
p.apply(CountingInput.unbounded().withMaxNumRecords(numElements));
-
-//      input = 
input.apply(Window.<Long>into(FixedWindows.of(Duration.standardSeconds(10))));
-
-      addCountingAsserts(input, numElements);
-      p.run();
-    }
-
-    @Test
-    @Category(NeedsRunner.class)
-    public void testUnboundedInputRate() {
-      Pipeline p = TestPipeline.create();
-      long numElements = 5000;
-
-      long elemsPerPeriod = 10L;
-      Duration periodLength = Duration.millis(8);
-      PCollection<Long> input =
-          p.apply(
-              CountingInput.unbounded()
-                  .withRate(elemsPerPeriod, periodLength)
-                  .withMaxNumRecords(numElements));
-
-      addCountingAsserts(input, numElements);
-      long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / 
elemsPerPeriod;
-      Instant startTime = Instant.now();
-      p.run();
-      Instant endTime = Instant.now();
-      assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), 
is(true));
-    }
-
-    private static class ElementValueDiff extends DoFn<Long, Long> {
-      @ProcessElement
-      public void processElement(ProcessContext c) throws Exception {
-        c.output(c.element() - c.timestamp().getMillis());
-      }
-    }
-
-    @Test
-    @Category(RunnableOnService.class)
-    public void testUnboundedInputTimestamps() {
-      Pipeline p = TestPipeline.create();
-      long numElements = 1000;
-
-      PCollection<Long> input =
-          p.apply(
-              CountingInput.unbounded()
-                  .withTimestampFn(new ValueAsTimestampFn())
-                  .withMaxNumRecords(numElements));
-      addCountingAsserts(input, numElements);
-
-      PCollection<Long> diffs =
-          input
-              .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
-              .apply("RemoveDuplicateTimestamps", 
RemoveDuplicates.<Long>create());
-      // This assert also confirms that diffs only has one unique value.
-      PAssert.thatSingleton(diffs).isEqualTo(0L);
-
-      p.run();
-    }
-
-    @Test
-    public void testUnboundedDisplayData() {
-      Duration maxReadTime = Duration.standardHours(5);
-      SerializableFunction<Long, Instant> timestampFn = new 
SerializableFunction<Long, Instant>() {
-        @Override
-        public Instant apply(Long input) {
-          return Instant.now();
-        }
-      };
-
-      PTransform<?, ?> input = CountingInput.unbounded()
-          .withMaxNumRecords(1234)
-          .withMaxReadTime(maxReadTime)
-          .withTimestampFn(timestampFn);
-
-      DisplayData displayData = DisplayData.from(input);
-
-      assertThat(displayData, hasDisplayItem("maxRecords", 1234));
-      assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime));
-      assertThat(displayData, hasDisplayItem("timestampFn", 
timestampFn.getClass()));
-    }
-
-    /**
-     * A timestamp function that uses the given value as the timestamp. 
Because the input values will
-     * not wrap, this function is non-decreasing and meets the timestamp 
function criteria laid out
-     * in {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)}.
-     */
-    private static class ValueAsTimestampFn implements 
SerializableFunction<Long, Instant> {
-      @Override
-      public Instant apply(Long input) {
-        return new Instant(input);
-      }
-    }
-
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
index 06aaf55..6239021 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
@@ -20,20 +20,25 @@ package org.apache.beam.runners.apex.translators;
 
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunnerResult;
+import org.apache.beam.runners.apex.TestApexRunner;
 import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator;
 import 
org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
+import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionView;
 
 import com.datatorrent.api.DAG;
+import com.datatorrent.lib.util.KryoCloneUtils;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 
@@ -48,6 +53,7 @@ import org.slf4j.LoggerFactory;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.regex.Pattern;
@@ -123,13 +129,11 @@ public class ParDoBoundTranslatorTest {
     }
   }
 
-
-  @Ignore
   @Test
   public void testAssertionFailure() throws Exception {
     ApexPipelineOptions options = PipelineOptionsFactory.create()
         .as(ApexPipelineOptions.class);
-    options.setRunner(ApexRunner.class);
+    options.setRunner(TestApexRunner.class);
     Pipeline pipeline = Pipeline.create(options);
 
     PCollection<Integer> pcollection = pipeline
@@ -149,6 +153,16 @@ public class ParDoBoundTranslatorTest {
         expectedPattern.matcher(exc.getMessage()).find());
   }
 
+  @Test
+  public void testContainsInAnyOrder() throws Exception {
+    ApexPipelineOptions options = 
PipelineOptionsFactory.create().as(ApexPipelineOptions.class);
+    options.setRunner(TestApexRunner.class);
+    Pipeline pipeline = Pipeline.create(options);
+    PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
+    PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3);
+    pipeline.run();
+  }
+
   private static Throwable runExpectingAssertionFailure(Pipeline pipeline) {
     // We cannot use thrown.expect(AssertionError.class) because the 
AssertionError
     // is first caught by JUnit and causes a test failure.
@@ -161,4 +175,19 @@ public class ParDoBoundTranslatorTest {
     throw new RuntimeException("unreachable");
   }
 
+  @Test
+  public void testSerialization() throws Exception {
+    ApexPipelineOptions options = PipelineOptionsFactory.create()
+        .as(ApexPipelineOptions.class);
+    ApexParDoOperator<Integer, Integer> operator = new 
ApexParDoOperator<>(options,
+        new Add(0), WindowingStrategy.globalDefault(), 
Collections.<PCollectionView<?>> emptyList());
+    operator.setup(null);
+    operator.beginWindow(0);
+    WindowedValue<Integer> wv = WindowedValue.valueInGlobalWindow(0);
+    operator.input.process(ApexStreamTuple.DataTuple.of(wv));
+    
operator.input.process(ApexStreamTuple.WatermarkTuple.<WindowedValue<Integer>>of(0));
+    operator.endWindow();
+    Assert.assertNotNull("Serialization", 
KryoCloneUtils.cloneObject(operator));
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/resources/log4j.properties 
b/runners/apex/src/test/resources/log4j.properties
index 84a6f68..c0efc5d 100644
--- a/runners/apex/src/test/resources/log4j.properties
+++ b/runners/apex/src/test/resources/log4j.properties
@@ -26,8 +26,8 @@ log4j.appender.testlogger.target = System.err
 log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
 log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 
-log4j.logger.org=info
-#log4j.logger.org.apache.commons.beanutils=warn
+log4j.logger.org=debug
+log4j.logger.org.apache.commons.beanutils=warn
 log4j.logger.com.datatorrent=info
 log4j.logger.org.apache.apex=debug
 log4j.logger.org.apache.beam.runners.apex=debug

Reply via email to