BEAM-261 Read.Bounded and FlattenPCollection.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/074b18f6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/074b18f6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/074b18f6 Branch: refs/heads/master Commit: 074b18f6ae0cfc1a3cc986f89ded6a45e0a3eb57 Parents: a7e430d Author: Thomas Weise <t...@apache.org> Authored: Sun Sep 11 20:34:08 2016 -0700 Committer: Thomas Weise <t...@apache.org> Committed: Sun Oct 16 23:25:28 2016 -0700 ---------------------------------------------------------------------- runners/apex/pom.xml | 2 +- .../runners/apex/ApexPipelineTranslator.java | 16 ++ .../apache/beam/runners/apex/ApexRunner.java | 10 +- .../FlattenPCollectionTranslator.java | 53 ++++- .../apex/translators/TranslationContext.java | 24 +-- .../functions/ApexGroupByKeyOperator.java | 6 +- .../functions/ApexParDoOperator.java | 6 +- .../beam/runners/apex/examples/IntTests.java | 207 +++++++++++++++++++ .../translators/ReadUnboundTranslatorTest.java | 2 +- 9 files changed, 284 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/pom.xml ---------------------------------------------------------------------- diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml index 21e53a8..e9377b4 100644 --- a/runners/apex/pom.xml +++ b/runners/apex/pom.xml @@ -28,7 +28,7 @@ <relativePath>../pom.xml</relativePath> </parent> - <artifactId>beam-runners-apex_3.4.0</artifactId> + <artifactId>beam-runners-apex</artifactId> <name>Apache Beam :: Runners :: Apex</name> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java index 8ea7139..b0391b4 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java @@ -25,6 +25,8 @@ import org.apache.beam.runners.apex.translators.ParDoBoundTranslator; import org.apache.beam.runners.apex.translators.ReadUnboundedTranslator; import org.apache.beam.runners.apex.translators.TransformTranslator; import org.apache.beam.runners.apex.translators.TranslationContext; +import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator; +import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.runners.TransformTreeNode; @@ -64,6 +66,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { // register TransformTranslators registerTransformTranslator(ParDo.Bound.class, new ParDoBoundTranslator()); registerTransformTranslator(Read.Unbounded.class, new ReadUnboundedTranslator()); + registerTransformTranslator(Read.Bounded.class, new ReadBoundedTranslator()); registerTransformTranslator(GroupByKey.class, new GroupByKeyTranslator()); registerTransformTranslator(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator()); @@ -130,5 +133,18 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { return transformTranslators.get(transformClass); } + private static class ReadBoundedTranslator<T> implements TransformTranslator<Read.Bounded<T>> { + private static final long serialVersionUID = 1L; + + @Override + public void translate(Read.Bounded<T> transform, TranslationContext context) { + // TODO: adapter is visibleForTesting + BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>(transform.getSource()); + ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>( + unboundedSource, context.getPipelineOptions()); + context.addOperator(operator, operator.output); + } + + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index 87c8f97..5fa3f23 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -20,10 +20,7 @@ package org.apache.beam.runners.apex; import static com.google.common.base.Preconditions.checkArgument; import org.apache.beam.runners.apex.translators.TranslationContext; -import org.apache.beam.runners.core.UnboundedReadFromBoundedSource; -import org.apache.beam.runners.core.UnboundedReadFromBoundedSource.BoundedToUnboundedSourceAdapter; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; import org.apache.beam.sdk.transforms.Create; @@ -33,9 +30,8 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.util.AssignWindows; +import org.apache.beam.runners.core.AssignWindows; import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; @@ -70,6 +66,8 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { @Override public <OutputT extends POutput, InputT extends PInput> OutputT apply( PTransform<InputT, OutputT> transform, InputT input) { +//System.out.println("transform: " + transform); + if (Window.Bound.class.equals(transform.getClass())) { return (OutputT) ((PCollection) input).apply( new AssignWindowsAndSetStrategy((Window.Bound) transform)); @@ -79,8 +77,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED); - } else if (Read.Bounded.class.equals(transform.getClass())) { - return (OutputT) ((PBegin) input).apply(new UnboundedReadFromBoundedSource<>(((Read.Bounded)transform).getSource())); } else { return super.apply(transform, input); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java index f228149..e153867 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java @@ -18,11 +18,15 @@ package org.apache.beam.runners.apex.translators; +import java.util.List; + +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import com.datatorrent.lib.stream.StreamMerger; +import com.google.common.collect.Lists; /** * Flatten.FlattenPCollectionList translation to Apex operator. @@ -34,19 +38,46 @@ public class FlattenPCollectionTranslator<T> implements @Override public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) { - StreamMerger<T> operator = null; - PCollectionList<T> collections = context.getInput(); - if (collections.size() > 2) { - throw new UnsupportedOperationException("Currently supports only 2 collections: " + transform); - } - for (PCollection<T> collection : collections.getAll()) { - if (null == operator) { - operator = new StreamMerger<T>(); - context.addStream(collection, operator.data1); + PCollection<T> firstCollection = null; + PCollectionList<T> input = context.getInput(); + List<PCollection<T>> collections = input.getAll(); + List<PCollection<T>> remainingCollections = Lists.newArrayList(); + while (!collections.isEmpty()) { + for (PCollection<T> collection : collections) { + if (null == firstCollection) { + firstCollection = collection; + } else { + StreamMerger<T> operator = new StreamMerger<>(); + context.addStream(firstCollection, operator.data1); + context.addStream(collection, operator.data2); + if (collections.size() > 2) { + PCollection<T> resultCollection = intermediateCollection(collection, collection.getCoder()); + context.addOperator(operator, operator.out, resultCollection); + remainingCollections.add(resultCollection); + } else { + // final stream merge + context.addOperator(operator, operator.out); + } + firstCollection = null; + } + } + if (firstCollection != null) { + // push to next merge level + remainingCollections.add(firstCollection); + } + if (remainingCollections.size() > 1) { + collections = remainingCollections; + remainingCollections = Lists.newArrayList(); } else { - context.addStream(collection, operator.data2); + collections = Lists.newArrayList(); } } - context.addOperator(operator, operator.out); } + + public static <T> PCollection<T> intermediateCollection(PCollection<T> input, Coder<T> outputCoder) { + PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); + output.setCoder(outputCoder); + return output; + } + } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java index 92afd58..ab7cd0a 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java @@ -82,30 +82,22 @@ public class TranslationContext { } public void addOperator(Operator operator, OutputPort port) { - // Apex DAG requires a unique operator name - // use the transform's name and make it unique - String name = getCurrentTransform().getFullName(); - for (int i=1; this.operators.containsKey(name); name = getCurrentTransform().getFullName() + i++); - this.operators.put(name, operator); - PCollection<?> output = getOutput(); - this.streams.put(output, (Pair)new ImmutablePair<>(port, new ArrayList<>())); + addOperator(operator, port, this.<PCollection<?>>getOutput()); } /** - * Add operator that is internal to a transformation. - * @param output + * Add intermediate operator for the current transformation. * @param operator * @param port - * @param name + * @param output */ - public <T> PInput addInternalOperator(Operator operator, OutputPort port, String name, Coder<T> coder) { - checkArgument(this.operators.get(name) == null, "duplicate operator " + name); + public void addOperator(Operator operator, OutputPort port, PCollection output) { + // Apex DAG requires a unique operator name + // use the transform's name and make it unique + String name = getCurrentTransform().getFullName(); + for (int i=1; this.operators.containsKey(name); name = getCurrentTransform().getFullName() + i++); this.operators.put(name, operator); - PCollection<T> input = getInput(); - PCollection<T> output = PCollection.createPrimitiveOutputInternal(input.getPipeline(), input.getWindowingStrategy(), input.isBounded()); - output.setCoder(coder); this.streams.put(output, (Pair)new ImmutablePair<>(port, new ArrayList<>())); - return output; } public void addStream(PInput input, InputPort inputPort) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java index 4608c92..29e1b32 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java @@ -31,6 +31,7 @@ import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions; import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; +import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.options.PipelineOptions; @@ -42,7 +43,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.KeyedWorkItems; -import org.apache.beam.sdk.util.SystemReduceFn; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingInternals; @@ -56,14 +56,14 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; +import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.Operator; import com.datatorrent.api.StreamCodec; -import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; -import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; +import com.esotericsoftware.kryo.serializers.JavaSerializer; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; import com.google.common.collect.HashMultimap; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java index 8005832..d358d14 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java @@ -22,14 +22,14 @@ import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translators.utils.NoOpStepContext; import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; -import org.apache.beam.sdk.util.DoFnRunner; -import org.apache.beam.sdk.util.DoFnRunners; -import org.apache.beam.sdk.util.DoFnRunners.OutputManager; import org.apache.beam.sdk.util.ExecutionContext; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java new file mode 100644 index 0000000..0ee3442 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java @@ -0,0 +1,207 @@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.examples; + + + import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.Matchers.is; + import static org.junit.Assert.assertThat; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.TestApexRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.CountingInput; +import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.NeedsRunner; + import org.apache.beam.sdk.testing.PAssert; + import org.apache.beam.sdk.testing.RunnableOnService; + import org.apache.beam.sdk.testing.TestPipeline; + import org.apache.beam.sdk.transforms.Count; + import org.apache.beam.sdk.transforms.DoFn; + import org.apache.beam.sdk.transforms.Max; + import org.apache.beam.sdk.transforms.Min; + import org.apache.beam.sdk.transforms.PTransform; + import org.apache.beam.sdk.transforms.ParDo; + import org.apache.beam.sdk.transforms.RemoveDuplicates; + import org.apache.beam.sdk.transforms.SerializableFunction; + import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.Window; +import org.apache.beam.sdk.values.PCollection; + import org.joda.time.Duration; + import org.joda.time.Instant; + import org.junit.Test; + import org.junit.experimental.categories.Category; + import org.junit.runner.RunWith; + import org.junit.runners.JUnit4; + + /** + * Tests for {@link CountingInput}. + */ + @RunWith(JUnit4.class) + public class IntTests { + public static void addCountingAsserts(PCollection<Long> input, long numElements) { + // Count == numElements + PAssert.thatSingleton(input.apply("Count", Count.<Long>globally())) + .isEqualTo(numElements); + // Unique count == numElements + PAssert.thatSingleton( + input + .apply(RemoveDuplicates.<Long>create()) + .apply("UniqueCount", Count.<Long>globally())) + .isEqualTo(numElements); + // Min == 0 + PAssert.thatSingleton(input.apply("Min", Min.<Long>globally())).isEqualTo(0L); + // Max == numElements-1 + PAssert.thatSingleton(input.apply("Max", Max.<Long>globally())) + .isEqualTo(numElements - 1); + } + + @Test + @Category(RunnableOnService.class) + public void testBoundedInput() { + //Pipeline p = TestPipeline.create(); + ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class); + options.setRunner(TestApexRunner.class); + Pipeline p = Pipeline.create(options); + + long numElements = 1000; + PCollection<Long> input = p.apply(CountingInput.upTo(numElements)); + + addCountingAsserts(input, numElements); + p.run(); + } + + @Test + public void testBoundedDisplayData() { + PTransform<?, ?> input = CountingInput.upTo(1234); + DisplayData displayData = DisplayData.from(input); + assertThat(displayData, hasDisplayItem("upTo", 1234)); + } + + @Test + @Category(RunnableOnService.class) + public void testUnboundedInput() { + //Pipeline p = TestPipeline.create(); + ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class); + options.setRunner(TestApexRunner.class); + Pipeline p = Pipeline.create(options); + + + long numElements = 1000; + + PCollection<Long> input = p.apply(CountingInput.unbounded().withMaxNumRecords(numElements)); + +// input = input.apply(Window.<Long>into(FixedWindows.of(Duration.standardSeconds(10)))); + + addCountingAsserts(input, numElements); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testUnboundedInputRate() { + Pipeline p = TestPipeline.create(); + long numElements = 5000; + + long elemsPerPeriod = 10L; + Duration periodLength = Duration.millis(8); + PCollection<Long> input = + p.apply( + CountingInput.unbounded() + .withRate(elemsPerPeriod, periodLength) + .withMaxNumRecords(numElements)); + + addCountingAsserts(input, numElements); + long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / elemsPerPeriod; + Instant startTime = Instant.now(); + p.run(); + Instant endTime = Instant.now(); + assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true)); + } + + private static class ElementValueDiff extends DoFn<Long, Long> { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + c.output(c.element() - c.timestamp().getMillis()); + } + } + + @Test + @Category(RunnableOnService.class) + public void testUnboundedInputTimestamps() { + Pipeline p = TestPipeline.create(); + long numElements = 1000; + + PCollection<Long> input = + p.apply( + CountingInput.unbounded() + .withTimestampFn(new ValueAsTimestampFn()) + .withMaxNumRecords(numElements)); + addCountingAsserts(input, numElements); + + PCollection<Long> diffs = + input + .apply("TimestampDiff", ParDo.of(new ElementValueDiff())) + .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create()); + // This assert also confirms that diffs only has one unique value. + PAssert.thatSingleton(diffs).isEqualTo(0L); + + p.run(); + } + + @Test + public void testUnboundedDisplayData() { + Duration maxReadTime = Duration.standardHours(5); + SerializableFunction<Long, Instant> timestampFn = new SerializableFunction<Long, Instant>() { + @Override + public Instant apply(Long input) { + return Instant.now(); + } + }; + + PTransform<?, ?> input = CountingInput.unbounded() + .withMaxNumRecords(1234) + .withMaxReadTime(maxReadTime) + .withTimestampFn(timestampFn); + + DisplayData displayData = DisplayData.from(input); + + assertThat(displayData, hasDisplayItem("maxRecords", 1234)); + assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime)); + assertThat(displayData, hasDisplayItem("timestampFn", timestampFn.getClass())); + } + + /** + * A timestamp function that uses the given value as the timestamp. Because the input values will + * not wrap, this function is non-decreasing and meets the timestamp function criteria laid out + * in {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)}. + */ + private static class ValueAsTimestampFn implements SerializableFunction<Long, Instant> { + @Override + public Instant apply(Long input) { + return new Instant(input); + } + } + + + + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/074b18f6/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java index 6260632..f954537 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ReadUnboundTranslatorTest.java @@ -99,7 +99,7 @@ public class ReadUnboundTranslatorTest { ApexRunnerResult result = (ApexRunnerResult)p.run(); DAG dag = result.getApexDAG(); - DAG.OperatorMeta om = dag.getOperatorMeta("Read(BoundedCountingSource)/Read(BoundedCountingSource)/Read(BoundedToUnboundedSourceAdapter)"); + DAG.OperatorMeta om = dag.getOperatorMeta("Read(BoundedCountingSource)"); Assert.assertNotNull(om); Assert.assertEquals(om.getOperator().getClass(), ApexReadUnboundedInputOperator.class);