BEAM-261 Read.Bounded and FlattenPCollection.

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/074b18f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/074b18f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/074b18f6

Branch: refs/heads/apex-runner
Commit: 074b18f6ae0cfc1a3cc986f89ded6a45e0a3eb57
Parents: a7e430d
Author: Thomas Weise <t...@apache.org>
Authored: Sun Sep 11 20:34:08 2016 -0700
Committer: Thomas Weise <t...@apache.org>
Committed: Sun Oct 16 23:25:28 2016 -0700

----------------------------------------------------------------------
 runners/apex/pom.xml                            |   2 +-
 .../runners/apex/ApexPipelineTranslator.java    |  16 ++
 .../apache/beam/runners/apex/ApexRunner.java    |  10 +-
 .../FlattenPCollectionTranslator.java           |  53 ++++-
 .../apex/translators/TranslationContext.java    |  24 +--
 .../functions/ApexGroupByKeyOperator.java       |   6 +-
 .../functions/ApexParDoOperator.java            |   6 +-
 .../beam/runners/apex/examples/IntTests.java    | 207 +++++++++++++++++++
 .../translators/ReadUnboundTranslatorTest.java  |   2 +-
 9 files changed, 284 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 21e53a8..e9377b4 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -28,7 +28,7 @@
     <relativePath>../pom.xml</relativePath>
   </parent>
 
-  <artifactId>beam-runners-apex_3.4.0</artifactId>
+  <artifactId>beam-runners-apex</artifactId>
 
   <name>Apache Beam :: Runners :: Apex</name>
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
index 8ea7139..b0391b4 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
@@ -25,6 +25,8 @@ import 
org.apache.beam.runners.apex.translators.ParDoBoundTranslator;
 import org.apache.beam.runners.apex.translators.ReadUnboundedTranslator;
 import org.apache.beam.runners.apex.translators.TransformTranslator;
 import org.apache.beam.runners.apex.translators.TranslationContext;
+import 
org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator;
+import 
org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.runners.TransformTreeNode;
@@ -64,6 +66,7 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
     // register TransformTranslators
     registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator());
     registerTransformTranslator(Read.Unbounded.class, new 
ReadUnboundedTranslator());
+    registerTransformTranslator(Read.Bounded.class, new 
ReadBoundedTranslator());
     registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator());
     registerTransformTranslator(Flatten.FlattenPCollectionList.class,
         new FlattenPCollectionTranslator());
@@ -130,5 +133,18 @@ public class ApexPipelineTranslator implements 
Pipeline.PipelineVisitor {
     return transformTranslators.get(transformClass);
   }
 
+  private static class ReadBoundedTranslator<T> implements 
TransformTranslator<Read.Bounded<T>> {
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public void translate(Read.Bounded<T> transform, TranslationContext 
context) {
+      // TODO: adapter is visibleForTesting
+      BoundedToUnboundedSourceAdapter unboundedSource = new 
BoundedToUnboundedSourceAdapter<>(transform.getSource());
+      ApexReadUnboundedInputOperator<T, ?> operator = new 
ApexReadUnboundedInputOperator<>(
+          unboundedSource, context.getPipelineOptions());
+      context.addOperator(operator, operator.output);
+    }
+
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 87c8f97..5fa3f23 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -20,10 +20,7 @@ package org.apache.beam.runners.apex;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import org.apache.beam.runners.apex.translators.TranslationContext;
-import org.apache.beam.runners.core.UnboundedReadFromBoundedSource;
-import 
org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.Create;
@@ -33,9 +30,8 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
-import org.apache.beam.sdk.util.AssignWindows;
+import org.apache.beam.runners.core.AssignWindows;
 import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
@@ -70,6 +66,8 @@ public class ApexRunner extends 
PipelineRunner<ApexRunnerResult> {
   @Override
   public <OutputT extends POutput, InputT extends PInput> OutputT apply(
       PTransform<InputT, OutputT> transform, InputT input) {
+//System.out.println("transform: " + transform);
+
     if (Window.Bound.class.equals(transform.getClass())) {
       return (OutputT) ((PCollection) input).apply(
           new AssignWindowsAndSetStrategy((Window.Bound) transform));
@@ -79,8 +77,6 @@ public class ApexRunner extends 
PipelineRunner<ApexRunnerResult> {
               input.getPipeline(),
               WindowingStrategy.globalDefault(),
               PCollection.IsBounded.BOUNDED);
-    } else if (Read.Bounded.class.equals(transform.getClass())) {
-      return (OutputT) ((PBegin) input).apply(new 
UnboundedReadFromBoundedSource<>(((Read.Bounded)transform).getSource()));
     } else {
       return super.apply(transform, input);
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
index f228149..e153867 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java
@@ -18,11 +18,15 @@
 
 package org.apache.beam.runners.apex.translators;
 
+import java.util.List;
+
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 
 import com.datatorrent.lib.stream.StreamMerger;
+import com.google.common.collect.Lists;
 
 /**
  * Flatten.FlattenPCollectionList translation to Apex operator.
@@ -34,19 +38,46 @@ public class FlattenPCollectionTranslator<T> implements
 
   @Override
   public void translate(Flatten.FlattenPCollectionList<T> transform, 
TranslationContext context) {
-    StreamMerger<T> operator = null;
-    PCollectionList<T> collections = context.getInput();
-    if (collections.size() > 2) {
-      throw new UnsupportedOperationException("Currently supports only 2 
collections: " + transform);
-    }
-    for (PCollection<T> collection : collections.getAll()) {
-      if (null == operator) {
-        operator = new StreamMerger<T>();
-        context.addStream(collection, operator.data1);
+    PCollection<T> firstCollection = null;
+    PCollectionList<T> input = context.getInput();
+    List<PCollection<T>> collections = input.getAll();
+    List<PCollection<T>> remainingCollections = Lists.newArrayList();
+    while (!collections.isEmpty()) {
+      for (PCollection<T> collection : collections) {
+        if (null == firstCollection) {
+          firstCollection = collection;
+        } else {
+          StreamMerger<T> operator = new StreamMerger<>();
+          context.addStream(firstCollection, operator.data1);
+          context.addStream(collection, operator.data2);
+          if (collections.size() > 2) {
+            PCollection<T> resultCollection = 
intermediateCollection(collection, collection.getCoder());
+            context.addOperator(operator, operator.out, resultCollection);
+            remainingCollections.add(resultCollection);
+          } else {
+            // final stream merge
+            context.addOperator(operator, operator.out);
+          }
+          firstCollection = null;
+        }
+      }
+      if (firstCollection != null) {
+        // push to next merge level
+        remainingCollections.add(firstCollection);
+      }
+      if (remainingCollections.size() > 1) {
+        collections = remainingCollections;
+        remainingCollections = Lists.newArrayList();
       } else {
-        context.addStream(collection, operator.data2);
+        collections = Lists.newArrayList();
       }
     }
-    context.addOperator(operator, operator.out);
   }
+
+  public static <T> PCollection<T> intermediateCollection(PCollection<T> 
input, Coder<T> outputCoder) {
+    PCollection<T> output = 
PCollection.createPrimitiveOutputInternal(input.getPipeline(), 
input.getWindowingStrategy(), input.isBounded());
+    output.setCoder(outputCoder);
+    return output;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
index 92afd58..ab7cd0a 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java
@@ -82,30 +82,22 @@ public class TranslationContext {
   }
 
   public void addOperator(Operator operator, OutputPort port) {
-    // Apex DAG requires a unique operator name
-    // use the transform's name and make it unique
-    String name = getCurrentTransform().getFullName();
-    for (int i=1; this.operators.containsKey(name); name = 
getCurrentTransform().getFullName() + i++);
-    this.operators.put(name, operator);
-    PCollection<?> output = getOutput();
-    this.streams.put(output, (Pair)new ImmutablePair<>(port, new 
ArrayList<>()));
+    addOperator(operator, port, this.<PCollection<?>>getOutput());
   }
 
   /**
-   * Add operator that is internal to a transformation.
-   * @param output
+   * Add intermediate operator for the current transformation.
    * @param operator
    * @param port
-   * @param name
+   * @param output
    */
-  public <T> PInput addInternalOperator(Operator operator, OutputPort port, 
String name, Coder<T> coder) {
-    checkArgument(this.operators.get(name) == null, "duplicate operator " + 
name);
+  public void addOperator(Operator operator, OutputPort port, PCollection 
output) {
+    // Apex DAG requires a unique operator name
+    // use the transform's name and make it unique
+    String name = getCurrentTransform().getFullName();
+    for (int i=1; this.operators.containsKey(name); name = 
getCurrentTransform().getFullName() + i++);
     this.operators.put(name, operator);
-    PCollection<T> input = getInput();
-    PCollection<T> output = 
PCollection.createPrimitiveOutputInternal(input.getPipeline(), 
input.getWindowingStrategy(), input.isBounded());
-    output.setCoder(coder);
     this.streams.put(output, (Pair)new ImmutablePair<>(port, new 
ArrayList<>()));
-    return output;
   }
 
   public void addStream(PInput input, InputPort inputPort) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
index 4608c92..29e1b32 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java
@@ -31,6 +31,7 @@ import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
 import 
org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
 import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -42,7 +43,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.KeyedWorkItem;
 import org.apache.beam.sdk.util.KeyedWorkItems;
-import org.apache.beam.sdk.util.SystemReduceFn;
 import org.apache.beam.sdk.util.TimerInternals;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingInternals;
@@ -56,14 +56,14 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.joda.time.Instant;
 
+import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.StreamCodec;
-import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Throwables;
 import com.google.common.collect.HashMultimap;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
index 8005832..d358d14 100644
--- 
a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
+++ 
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java
@@ -22,14 +22,14 @@ import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple;
 import org.apache.beam.runners.apex.translators.utils.NoOpStepContext;
 import 
org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions;
+import org.apache.beam.runners.core.DoFnRunner;
+import org.apache.beam.runners.core.DoFnRunners;
+import org.apache.beam.runners.core.DoFnRunners.OutputManager;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.util.DoFnRunner;
-import org.apache.beam.sdk.util.DoFnRunners;
-import org.apache.beam.sdk.util.DoFnRunners.OutputManager;
 import org.apache.beam.sdk.util.ExecutionContext;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java
new file mode 100644
index 0000000..0ee3442
--- /dev/null
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java
@@ -0,0 +1,207 @@
+  /*
+   * Licensed to the Apache Software Foundation (ASF) under one
+   * or more contributor license agreements.  See the NOTICE file
+   * distributed with this work for additional information
+   * regarding copyright ownership.  The ASF licenses this file
+   * to you under the Apache License, Version 2.0 (the
+   * "License"); you may not use this file except in compliance
+   * with the License.  You may obtain a copy of the License at
+   *
+   *     http://www.apache.org/licenses/LICENSE-2.0
+   *
+   * Unless required by applicable law or agreed to in writing, software
+   * distributed under the License is distributed on an "AS IS" BASIS,
+   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   * See the License for the specific language governing permissions and
+   * limitations under the License.
+   */
+
+package org.apache.beam.runners.apex.examples;
+
+
+  import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+  import static org.hamcrest.Matchers.is;
+  import static org.junit.Assert.assertThat;
+
+import org.apache.beam.runners.apex.ApexPipelineOptions;
+import org.apache.beam.runners.apex.TestApexRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.NeedsRunner;
+  import org.apache.beam.sdk.testing.PAssert;
+  import org.apache.beam.sdk.testing.RunnableOnService;
+  import org.apache.beam.sdk.testing.TestPipeline;
+  import org.apache.beam.sdk.transforms.Count;
+  import org.apache.beam.sdk.transforms.DoFn;
+  import org.apache.beam.sdk.transforms.Max;
+  import org.apache.beam.sdk.transforms.Min;
+  import org.apache.beam.sdk.transforms.PTransform;
+  import org.apache.beam.sdk.transforms.ParDo;
+  import org.apache.beam.sdk.transforms.RemoveDuplicates;
+  import org.apache.beam.sdk.transforms.SerializableFunction;
+  import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.PCollection;
+  import org.joda.time.Duration;
+  import org.joda.time.Instant;
+  import org.junit.Test;
+  import org.junit.experimental.categories.Category;
+  import org.junit.runner.RunWith;
+  import org.junit.runners.JUnit4;
+
+  /**
+   * Tests for {@link CountingInput}.
+   */
+  @RunWith(JUnit4.class)
+  public class IntTests {
+    public static void addCountingAsserts(PCollection<Long> input, long 
numElements) {
+      // Count == numElements
+      PAssert.thatSingleton(input.apply("Count", Count.<Long>globally()))
+          .isEqualTo(numElements);
+      // Unique count == numElements
+      PAssert.thatSingleton(
+              input
+                  .apply(RemoveDuplicates.<Long>create())
+                  .apply("UniqueCount", Count.<Long>globally()))
+          .isEqualTo(numElements);
+      // Min == 0
+      PAssert.thatSingleton(input.apply("Min", 
Min.<Long>globally())).isEqualTo(0L);
+      // Max == numElements-1
+      PAssert.thatSingleton(input.apply("Max", Max.<Long>globally()))
+          .isEqualTo(numElements - 1);
+    }
+
+    @Test
+    @Category(RunnableOnService.class)
+    public void testBoundedInput() {
+      //Pipeline p = TestPipeline.create();
+      ApexPipelineOptions options = 
PipelineOptionsFactory.as(ApexPipelineOptions.class);
+      options.setRunner(TestApexRunner.class);
+      Pipeline p = Pipeline.create(options);
+
+      long numElements = 1000;
+      PCollection<Long> input = p.apply(CountingInput.upTo(numElements));
+
+      addCountingAsserts(input, numElements);
+      p.run();
+    }
+
+    @Test
+    public void testBoundedDisplayData() {
+      PTransform<?, ?> input = CountingInput.upTo(1234);
+      DisplayData displayData = DisplayData.from(input);
+      assertThat(displayData, hasDisplayItem("upTo", 1234));
+    }
+
+    @Test
+    @Category(RunnableOnService.class)
+    public void testUnboundedInput() {
+      //Pipeline p = TestPipeline.create();
+      ApexPipelineOptions options = 
PipelineOptionsFactory.as(ApexPipelineOptions.class);
+      options.setRunner(TestApexRunner.class);
+      Pipeline p = Pipeline.create(options);
+
+
+      long numElements = 1000;
+
+      PCollection<Long> input = 
p.apply(CountingInput.unbounded().withMaxNumRecords(numElements));
+
+//      input = 
input.apply(Window.<Long>into(FixedWindows.of(Duration.standardSeconds(10))));
+
+      addCountingAsserts(input, numElements);
+      p.run();
+    }
+
+    @Test
+    @Category(NeedsRunner.class)
+    public void testUnboundedInputRate() {
+      Pipeline p = TestPipeline.create();
+      long numElements = 5000;
+
+      long elemsPerPeriod = 10L;
+      Duration periodLength = Duration.millis(8);
+      PCollection<Long> input =
+          p.apply(
+              CountingInput.unbounded()
+                  .withRate(elemsPerPeriod, periodLength)
+                  .withMaxNumRecords(numElements));
+
+      addCountingAsserts(input, numElements);
+      long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / 
elemsPerPeriod;
+      Instant startTime = Instant.now();
+      p.run();
+      Instant endTime = Instant.now();
+      assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), 
is(true));
+    }
+
+    private static class ElementValueDiff extends DoFn<Long, Long> {
+      @ProcessElement
+      public void processElement(ProcessContext c) throws Exception {
+        c.output(c.element() - c.timestamp().getMillis());
+      }
+    }
+
+    @Test
+    @Category(RunnableOnService.class)
+    public void testUnboundedInputTimestamps() {
+      Pipeline p = TestPipeline.create();
+      long numElements = 1000;
+
+      PCollection<Long> input =
+          p.apply(
+              CountingInput.unbounded()
+                  .withTimestampFn(new ValueAsTimestampFn())
+                  .withMaxNumRecords(numElements));
+      addCountingAsserts(input, numElements);
+
+      PCollection<Long> diffs =
+          input
+              .apply("TimestampDiff", ParDo.of(new ElementValueDiff()))
+              .apply("RemoveDuplicateTimestamps", 
RemoveDuplicates.<Long>create());
+      // This assert also confirms that diffs only has one unique value.
+      PAssert.thatSingleton(diffs).isEqualTo(0L);
+
+      p.run();
+    }
+
+    @Test
+    public void testUnboundedDisplayData() {
+      Duration maxReadTime = Duration.standardHours(5);
+      SerializableFunction<Long, Instant> timestampFn = new 
SerializableFunction<Long, Instant>() {
+        @Override
+        public Instant apply(Long input) {
+          return Instant.now();
+        }
+      };
+
+      PTransform<?, ?> input = CountingInput.unbounded()
+          .withMaxNumRecords(1234)
+          .withMaxReadTime(maxReadTime)
+          .withTimestampFn(timestampFn);
+
+      DisplayData displayData = DisplayData.from(input);
+
+      assertThat(displayData, hasDisplayItem("maxRecords", 1234));
+      assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime));
+      assertThat(displayData, hasDisplayItem("timestampFn", 
timestampFn.getClass()));
+    }
+
+    /**
+     * A timestamp function that uses the given value as the timestamp. 
Because the input values will
+     * not wrap, this function is non-decreasing and meets the timestamp 
function criteria laid out
+     * in {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)}.
+     */
+    private static class ValueAsTimestampFn implements 
SerializableFunction<Long, Instant> {
+      @Override
+      public Instant apply(Long input) {
+        return new Instant(input);
+      }
+    }
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
----------------------------------------------------------------------
diff --git 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
index 6260632..f954537 100644
--- 
a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
+++ 
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java
@@ -99,7 +99,7 @@ public class ReadUnboundTranslatorTest {
 
     ApexRunnerResult result = (ApexRunnerResult)p.run();
     DAG dag = result.getApexDAG();
-    DAG.OperatorMeta om = 
dag.getOperatorMeta("Read(BoundedCountingSource)/Read(BoundedCountingSource)/Read(BoundedToUnboundedSourceAdapter)");
+    DAG.OperatorMeta om = dag.getOperatorMeta("Read(BoundedCountingSource)");
     Assert.assertNotNull(om);
     Assert.assertEquals(om.getOperator().getClass(), 
ApexReadUnboundedInputOperator.class);
 

Reply via email to