BEAM-261 PCollectionView and side inputs.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/09754942 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/09754942 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/09754942 Branch: refs/heads/apex-runner Commit: 09754942c66c9befffc8df9b3c8a75b819a672e6 Parents: 074b18f Author: Thomas Weise <t...@apache.org> Authored: Sun Sep 25 16:46:44 2016 -0700 Committer: Thomas Weise <t...@apache.org> Committed: Sun Oct 16 23:25:55 2016 -0700 ---------------------------------------------------------------------- .../beam/runners/apex/ApexPipelineOptions.java | 6 + .../runners/apex/ApexPipelineTranslator.java | 19 +- .../apache/beam/runners/apex/ApexRunner.java | 397 ++++++++++++++++++- .../FlattenPCollectionTranslator.java | 26 +- .../apex/translators/ParDoBoundTranslator.java | 22 +- .../apex/translators/TranslationContext.java | 14 +- .../functions/ApexFlattenOperator.java | 113 ++++++ .../functions/ApexGroupByKeyOperator.java | 78 +++- .../functions/ApexParDoOperator.java | 210 ++++++++-- .../io/ApexReadUnboundedInputOperator.java | 31 +- .../apex/translators/utils/ApexStreamTuple.java | 11 + .../translators/utils/NoOpSideInputReader.java | 47 --- .../beam/runners/apex/examples/IntTest.java | 133 +++++++ .../beam/runners/apex/examples/IntTests.java | 207 ---------- .../translators/ParDoBoundTranslatorTest.java | 37 +- .../apex/src/test/resources/log4j.properties | 4 +- 16 files changed, 1028 insertions(+), 327 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java index f70d24c..141a8c1 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java @@ -50,6 +50,12 @@ public interface ApexPipelineOptions extends PipelineOptions, java.io.Serializab @Default.Boolean(true) boolean isEmbeddedExecutionDebugMode(); + @Description("output data received and emitted on ports (for debugging)") + void setTupleTracingEnabled(boolean enabled); + + @Default.Boolean(false) + boolean isTupleTracingEnabled(); + @Description("how long the client should wait for the pipeline to run") void setRunMillis(long runMillis); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java index b0391b4..ad8c283 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.apex; +import org.apache.beam.runners.apex.ApexRunner.CreateApexPCollectionView; import org.apache.beam.runners.apex.translators.CreateValuesTranslator; import org.apache.beam.runners.apex.translators.FlattenPCollectionTranslator; import org.apache.beam.runners.apex.translators.GroupByKeyTranslator; @@ -35,8 +36,8 @@ import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -71,6 +72,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { registerTransformTranslator(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator()); registerTransformTranslator(Create.Values.class, new CreateValuesTranslator()); + registerTransformTranslator(CreateApexPCollectionView.class, new CreatePCollectionViewTranslator()); } public ApexPipelineTranslator(TranslationContext translationContext) { @@ -98,7 +100,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { PTransform transform = node.getTransform(); TransformTranslator translator = getTransformTranslator(transform.getClass()); if (null == translator) { - throw new IllegalStateException( + throw new UnsupportedOperationException( "no translator registered for " + transform); } translationContext.setCurrentTransform(node); @@ -147,4 +149,17 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor { } + private static class CreatePCollectionViewTranslator<ElemT, ViewT> implements TransformTranslator<CreateApexPCollectionView<ElemT, ViewT>> + { + private static final long serialVersionUID = 1L; + + @Override + public void translate(CreateApexPCollectionView<ElemT, ViewT> transform, TranslationContext context) + { + PCollectionView<ViewT> view = transform.getView(); + context.addView(view); + LOG.debug("view {}", view.getName()); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index 5fa3f23..ae79a20 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -19,20 +19,36 @@ package org.apache.beam.runners.apex; import static com.google.common.base.Preconditions.checkArgument; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + import org.apache.beam.runners.apex.translators.TranslationContext; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.ListCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; +import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.transforms.OldDoFn.ProcessContext; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.runners.core.AssignWindows; +import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.hadoop.conf.Configuration; @@ -55,6 +71,13 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { private final ApexPipelineOptions options; + /** + * TODO: this isn't thread sa + * Holds any most resent assertion error that was raised while processing elements. + * Used in the unit test driver in embedded to propagate the exception. + */ + public static volatile AssertionError assertionError; + public ApexRunner(ApexPipelineOptions options) { this.options = options; } @@ -77,6 +100,32 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { input.getPipeline(), WindowingStrategy.globalDefault(), PCollection.IsBounded.BOUNDED); +// TODO: replace this with a mapping + } else if (Combine.GloballyAsSingletonView.class.equals(transform.getClass())) { + PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingCombineGloballyAsSingletonView<InputT, OutputT>(this, + (Combine.GloballyAsSingletonView)transform); + return Pipeline.applyTransform(input, customTransform); + } else if (View.AsSingleton.class.equals(transform.getClass())) { + // note this assumes presence of above Combine.GloballyAsSingletonView mapping + PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingViewAsSingleton<InputT>(this, + (View.AsSingleton)transform); + return Pipeline.applyTransform(input, customTransform); + } else if (View.AsIterable.class.equals(transform.getClass())) { + PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingViewAsIterable<InputT>(this, + (View.AsIterable)transform); + return Pipeline.applyTransform(input, customTransform); + } else if (View.AsList.class.equals(transform.getClass())) { + PTransform<InputT, OutputT> customTransform = (PTransform)new StreamingViewAsList<InputT>(this, + (View.AsList)transform); + return Pipeline.applyTransform(input, customTransform); + } else if (View.AsMap.class.equals(transform.getClass())) { + PTransform<InputT, OutputT> customTransform = new StreamingViewAsMap(this, + (View.AsMap)transform); + return Pipeline.applyTransform(input, customTransform); + } else if (View.AsMultimap.class.equals(transform.getClass())) { + PTransform<InputT, OutputT> customTransform = new StreamingViewAsMultimap(this, + (View.AsMultimap)transform); + return Pipeline.applyTransform(input, customTransform); } else { return super.apply(transform, input); } @@ -109,10 +158,19 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { // turns off timeout checking for operator progress lc.setHeartbeatMonitoringEnabled(false); } + assertionError = null; + lc.runAsync(); if (options.getRunMillis() > 0) { - lc.run(options.getRunMillis()); - } else { - lc.runAsync(); + try { + long timeout = System.currentTimeMillis() + options.getRunMillis(); + while (System.currentTimeMillis() < timeout) { + if (assertionError != null) { + throw assertionError; + } + } + } finally { + lc.shutdown(); + } } return new ApexRunnerResult(lma.getDAG(), lc); } catch (Exception e) { @@ -158,10 +216,343 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } private static class IdentityFn<T> extends DoFn<T, T> { + private static final long serialVersionUID = 1L; @ProcessElement public void processElement(ProcessContext c) { c.output(c.element()); } } +//////////////////////////////////////////// +// Adapted from FlinkRunner for View support + + /** + * Records that the {@link PTransform} requires a deterministic key coder. + */ + private void recordViewUsesNonDeterministicKeyCoder(PTransform<?, ?> ptransform) { + throw new UnsupportedOperationException(); + } + + /** + * Creates a primitive {@link PCollectionView}. + * + * <p>For internal use only by runner implementors. + * + * @param <ElemT> The type of the elements of the input PCollection + * @param <ViewT> The type associated with the {@link PCollectionView} used as a side input + */ + public static class CreateApexPCollectionView<ElemT, ViewT> + extends PTransform<PCollection<List<ElemT>>, PCollectionView<ViewT>> { + private PCollectionView<ViewT> view; + + private CreateApexPCollectionView(PCollectionView<ViewT> view) { + this.view = view; + } + + public static <ElemT, ViewT> CreateApexPCollectionView<ElemT, ViewT> of( + PCollectionView<ViewT> view) { + return new CreateApexPCollectionView<>(view); + } + + public PCollectionView<ViewT> getView() { + return view; + } + + @Override + public PCollectionView<ViewT> apply(PCollection<List<ElemT>> input) { + return view; + } + } + + private static class WrapAsList<T> extends OldDoFn<T, List<T>> { + @Override + public void processElement(ProcessContext c) { + c.output(Arrays.asList(c.element())); + } + } + + private static class StreamingCombineGloballyAsSingletonView<InputT, OutputT> + extends PTransform<PCollection<InputT>, PCollectionView<OutputT>> + { + Combine.GloballyAsSingletonView<InputT, OutputT> transform; + + /** + * Builds an instance of this class from the overridden transform. + */ + public StreamingCombineGloballyAsSingletonView(ApexRunner runner, + Combine.GloballyAsSingletonView<InputT, OutputT> transform) + { + this.transform = transform; + } + + @Override + public PCollectionView<OutputT> apply(PCollection<InputT> input) + { + PCollection<OutputT> combined = input + .apply(Combine.globally(transform.getCombineFn()).withoutDefaults().withFanout(transform.getFanout())); + + PCollectionView<OutputT> view = PCollectionViews.singletonView(combined.getPipeline(), + combined.getWindowingStrategy(), transform.getInsertDefault(), + transform.getInsertDefault() ? transform.getCombineFn().defaultValue() : null, combined.getCoder()); + return combined.apply(ParDo.of(new WrapAsList<OutputT>())) + .apply(CreateApexPCollectionView.<OutputT, OutputT> of(view)); + } + + @Override + protected String getKindString() + { + return "StreamingCombineGloballyAsSingletonView"; + } + } + + private static class StreamingViewAsSingleton<T> extends PTransform<PCollection<T>, PCollectionView<T>> + { + private static final long serialVersionUID = 1L; + private View.AsSingleton<T> transform; + + public StreamingViewAsSingleton(ApexRunner runner, View.AsSingleton<T> transform) + { + this.transform = transform; + } + + @Override + public PCollectionView<T> apply(PCollection<T> input) + { + Combine.Globally<T, T> combine = Combine + .globally(new SingletonCombine<>(transform.hasDefaultValue(), transform.defaultValue())); + if (!transform.hasDefaultValue()) { + combine = combine.withoutDefaults(); + } + return input.apply(combine.asSingletonView()); + } + + @Override + protected String getKindString() + { + return "StreamingViewAsSingleton"; + } + + private static class SingletonCombine<T> extends Combine.BinaryCombineFn<T> + { + private boolean hasDefaultValue; + private T defaultValue; + + SingletonCombine(boolean hasDefaultValue, T defaultValue) + { + this.hasDefaultValue = hasDefaultValue; + this.defaultValue = defaultValue; + } + + @Override + public T apply(T left, T right) + { + throw new IllegalArgumentException("PCollection with more than one element " + + "accessed as a singleton view. Consider using Combine.globally().asSingleton() to " + + "combine the PCollection into a single value"); + } + + @Override + public T identity() + { + if (hasDefaultValue) { + return defaultValue; + } else { + throw new IllegalArgumentException("Empty PCollection accessed as a singleton view. " + + "Consider setting withDefault to provide a default value"); + } + } + } + } + + private static class StreamingViewAsMap<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, V>>> { + + private final ApexRunner runner; + + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public StreamingViewAsMap(ApexRunner runner, View.AsMap<K, V> transform) { + this.runner = runner; + } + + @Override + public PCollectionView<Map<K, V>> apply(PCollection<KV<K, V>> input) { + PCollectionView<Map<K, V>> view = + PCollectionViews.mapView( + input.getPipeline(), + input.getWindowingStrategy(), + input.getCoder()); + + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + try { + inputCoder.getKeyCoder().verifyDeterministic(); + } catch (Coder.NonDeterministicException e) { + runner.recordViewUsesNonDeterministicKeyCoder(this); + } + + return input + .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) + .apply(CreateApexPCollectionView.<KV<K, V>, Map<K, V>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsMap"; + } + } + + /** + * Specialized expansion for {@link + * org.apache.beam.sdk.transforms.View.AsMultimap View.AsMultimap} for the + * Flink runner in streaming mode. + */ + private static class StreamingViewAsMultimap<K, V> + extends PTransform<PCollection<KV<K, V>>, PCollectionView<Map<K, Iterable<V>>>> { + + private final ApexRunner runner; + + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public StreamingViewAsMultimap(ApexRunner runner, View.AsMultimap<K, V> transform) { + this.runner = runner; + } + + @Override + public PCollectionView<Map<K, Iterable<V>>> apply(PCollection<KV<K, V>> input) { + PCollectionView<Map<K, Iterable<V>>> view = + PCollectionViews.multimapView( + input.getPipeline(), + input.getWindowingStrategy(), + input.getCoder()); + + @SuppressWarnings({"rawtypes", "unchecked"}) + KvCoder<K, V> inputCoder = (KvCoder) input.getCoder(); + try { + inputCoder.getKeyCoder().verifyDeterministic(); + } catch (Coder.NonDeterministicException e) { + runner.recordViewUsesNonDeterministicKeyCoder(this); + } + + return input + .apply(Combine.globally(new Concatenate<KV<K, V>>()).withoutDefaults()) + .apply(CreateApexPCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsMultimap"; + } + } + + /** + * Specialized implementation for + * {@link org.apache.beam.sdk.transforms.View.AsList View.AsList} for the + * Flink runner in streaming mode. + */ + private static class StreamingViewAsList<T> + extends PTransform<PCollection<T>, PCollectionView<List<T>>> { + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public StreamingViewAsList(ApexRunner runner, View.AsList<T> transform) {} + + @Override + public PCollectionView<List<T>> apply(PCollection<T> input) { + PCollectionView<List<T>> view = + PCollectionViews.listView( + input.getPipeline(), + input.getWindowingStrategy(), + input.getCoder()); + + return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) + .apply(CreateApexPCollectionView.<T, List<T>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsList"; + } + } + + /** + * Specialized implementation for + * {@link org.apache.beam.sdk.transforms.View.AsIterable View.AsIterable} for the + * Flink runner in streaming mode. + */ + private static class StreamingViewAsIterable<T> + extends PTransform<PCollection<T>, PCollectionView<Iterable<T>>> { + /** + * Builds an instance of this class from the overridden transform. + */ + @SuppressWarnings("unused") // used via reflection in FlinkRunner#apply() + public StreamingViewAsIterable(ApexRunner runner, View.AsIterable<T> transform) { } + + @Override + public PCollectionView<Iterable<T>> apply(PCollection<T> input) { + PCollectionView<Iterable<T>> view = + PCollectionViews.iterableView( + input.getPipeline(), + input.getWindowingStrategy(), + input.getCoder()); + + return input.apply(Combine.globally(new Concatenate<T>()).withoutDefaults()) + .apply(CreateApexPCollectionView.<T, Iterable<T>>of(view)); + } + + @Override + protected String getKindString() { + return "StreamingViewAsIterable"; + } + } + + /** + * Combiner that combines {@code T}s into a single {@code List<T>} containing all inputs. + * + * <p>For internal use by {@link StreamingViewAsMap}, {@link StreamingViewAsMultimap}, + * {@link StreamingViewAsList}, {@link StreamingViewAsIterable}. + * They require the input {@link PCollection} fits in memory. + * For a large {@link PCollection} this is expected to crash! + * + * @param <T> the type of elements to concatenate. + */ + private static class Concatenate<T> extends Combine.CombineFn<T, List<T>, List<T>> { + @Override + public List<T> createAccumulator() { + return new ArrayList<T>(); + } + + @Override + public List<T> addInput(List<T> accumulator, T input) { + accumulator.add(input); + return accumulator; + } + + @Override + public List<T> mergeAccumulators(Iterable<List<T>> accumulators) { + List<T> result = createAccumulator(); + for (List<T> accumulator : accumulators) { + result.addAll(accumulator); + } + return result; + } + + @Override + public List<T> extractOutput(List<T> accumulator) { + return accumulator; + } + + @Override + public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) { + return ListCoder.of(inputCoder); + } + + @Override + public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) { + return ListCoder.of(inputCoder); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java index e153867..712466a 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/FlattenPCollectionTranslator.java @@ -18,19 +18,23 @@ package org.apache.beam.runners.apex.translators; +import java.util.Collections; import java.util.List; +import org.apache.beam.runners.apex.translators.functions.ApexFlattenOperator; +import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator; +import org.apache.beam.runners.apex.translators.io.ValuesSource; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.VoidCoder; +import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; -import com.datatorrent.lib.stream.StreamMerger; import com.google.common.collect.Lists; /** - * Flatten.FlattenPCollectionList translation to Apex operator. - * TODO: support more than two streams + * {@link Flatten.FlattenPCollectionList} translation to Apex operator. */ public class FlattenPCollectionTranslator<T> implements TransformTranslator<Flatten.FlattenPCollectionList<T>> { @@ -38,16 +42,28 @@ public class FlattenPCollectionTranslator<T> implements @Override public void translate(Flatten.FlattenPCollectionList<T> transform, TranslationContext context) { - PCollection<T> firstCollection = null; PCollectionList<T> input = context.getInput(); List<PCollection<T>> collections = input.getAll(); + + if (collections.isEmpty()) { + // create a dummy source that never emits anything + @SuppressWarnings("unchecked") + UnboundedSource<T, ?> unboundedSource = new ValuesSource<>(Collections.EMPTY_LIST, + (Coder<T>) VoidCoder.of()); + ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>( + unboundedSource, context.getPipelineOptions()); + context.addOperator(operator, operator.output); + return; + } + List<PCollection<T>> remainingCollections = Lists.newArrayList(); + PCollection<T> firstCollection = null; while (!collections.isEmpty()) { for (PCollection<T> collection : collections) { if (null == firstCollection) { firstCollection = collection; } else { - StreamMerger<T> operator = new StreamMerger<>(); + ApexFlattenOperator<T> operator = new ApexFlattenOperator<>(); context.addStream(firstCollection, operator.data1); context.addStream(collection, operator.data2); if (collections.size() > 2) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java index a958234..632829a 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslator.java @@ -18,11 +18,15 @@ package org.apache.beam.runners.apex.translators; +import java.util.List; + import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator; -import org.apache.beam.runners.apex.translators.utils.NoOpSideInputReader; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +import com.datatorrent.api.Operator; /** * {@link ParDo.Bound} is translated to Apex operator that wraps the {@link DoFn} @@ -35,9 +39,23 @@ public class ParDoBoundTranslator<InputT, OutputT> implements public void translate(ParDo.Bound<InputT, OutputT> transform, TranslationContext context) { OldDoFn<InputT, OutputT> doFn = transform.getFn(); PCollection<OutputT> output = context.getOutput(); + List<PCollectionView<?>> sideInputs = transform.getSideInputs(); ApexParDoOperator<InputT, OutputT> operator = new ApexParDoOperator<>(context.getPipelineOptions(), - doFn, output.getWindowingStrategy(), new NoOpSideInputReader()); + doFn, output.getWindowingStrategy(), sideInputs); context.addOperator(operator, operator.output); context.addStream(context.getInput(), operator.input); + if (!sideInputs.isEmpty()) { + Operator.InputPort<?>[] sideInputPorts = {operator.sideInput1}; + for (int i=0; i<sideInputs.size(); i++) { + // the number of input ports for side inputs are fixed and each port can only take one input. + // more (optional) ports can be added to give reasonable capacity or an explicit union operation introduced. + if (i == sideInputPorts.length) { + String msg = String.format("Too many side inputs in %s (currently only supporting %s).", + transform.toString(), sideInputPorts.length); + throw new UnsupportedOperationException(msg); + } + context.addStream(context.getViewInput(sideInputs.get(i)), sideInputPorts[i]); + } + } } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java index ab7cd0a..163cfd4 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/TranslationContext.java @@ -28,6 +28,7 @@ import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -54,6 +55,17 @@ public class TranslationContext { private AppliedPTransform<?, ?, ?> currentTransform; private final Map<PCollection, Pair<OutputPort<?>, List<InputPort<?>>>> streams = new HashMap<>(); private final Map<String, Operator> operators = new HashMap<>(); + private final Map<PCollectionView<?>, PInput> viewInputs = new HashMap<>(); + + public void addView(PCollectionView<?> view) { + this.viewInputs.put(view, this.getInput()); + } + + public <InputT extends PInput> InputT getViewInput(PCollectionView<?> view) { + PInput input = this.viewInputs.get(view); + checkArgument(input != null, "unknown view " + view.getName()); + return (InputT)input; + } public TranslationContext(ApexPipelineOptions pipelineOptions) { this.pipelineOptions = pipelineOptions; @@ -102,7 +114,7 @@ public class TranslationContext { public void addStream(PInput input, InputPort inputPort) { Pair<OutputPort<?>, List<InputPort<?>>> stream = this.streams.get(input); - checkArgument(stream != null, "no upstream operator defined"); + checkArgument(stream != null, "no upstream operator defined for %s", input); stream.getRight().add(inputPort); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java new file mode 100644 index 0000000..ce27abb --- /dev/null +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexFlattenOperator.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex.translators.functions; + +import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; +import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple.WatermarkTuple; +import org.apache.beam.sdk.transforms.Flatten; +import org.apache.beam.sdk.util.WindowedValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + +/** + * Apex operator for Beam {@link Flatten.FlattenPCollectionList}. + */ +public class ApexFlattenOperator<InputT> extends BaseOperator +{ + private static final Logger LOG = LoggerFactory.getLogger(ApexFlattenOperator.class); + private boolean traceTuples = true; + + private long inputWM1; + private long inputWM2; + private long outputWM; + + /** + * Data input port 1. + */ + public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data1 = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() + { + /** + * Emits to port "out" + */ + @Override + public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) + { + if (tuple instanceof WatermarkTuple) { + WatermarkTuple<?> wmTuple = (WatermarkTuple<?>)tuple; + if (wmTuple.getTimestamp() > inputWM1) { + inputWM1 = wmTuple.getTimestamp(); + if (inputWM1 <= inputWM2) { + // move output watermark and emit it + outputWM = inputWM1; + if (traceTuples) { + LOG.debug("\nemitting watermark {}\n", outputWM); + } + out.emit(tuple); + } + } + return; + } + if (traceTuples) { + LOG.debug("\nemitting {}\n", tuple); + } + out.emit(tuple); + } + }; + + /** + * Data input port 2. + */ + public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> data2 = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() + { + /** + * Emits to port "out" + */ + @Override + public void process(ApexStreamTuple<WindowedValue<InputT>> tuple) + { + if (tuple instanceof WatermarkTuple) { + WatermarkTuple<?> wmTuple = (WatermarkTuple<?>)tuple; + if (wmTuple.getTimestamp() > inputWM2) { + inputWM2 = wmTuple.getTimestamp(); + if (inputWM2 <= inputWM1) { + // move output watermark and emit it + outputWM = inputWM2; + if (traceTuples) { + LOG.debug("\nemitting watermark {}\n", outputWM); + } + out.emit(tuple); + } + } + return; + } + if (traceTuples) { + LOG.debug("\nemitting {}\n", tuple); + } + out.emit(tuple); + } + }; + + /** + * Output port. + */ + public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>> out = new DefaultOutputPort<ApexStreamTuple<WindowedValue<InputT>>>(); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java index 29e1b32..5970f36 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexGroupByKeyOperator.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.apex.translators.functions; import java.io.IOException; import java.io.Serializable; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -33,6 +34,7 @@ import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOption import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; @@ -41,6 +43,7 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.KeyedWorkItem; import org.apache.beam.sdk.util.KeyedWorkItems; import org.apache.beam.sdk.util.TimerInternals; @@ -55,6 +58,8 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; @@ -79,16 +84,22 @@ import com.google.common.collect.Multimap; */ public class ApexGroupByKeyOperator<K, V> implements Operator { + private static final Logger LOG = LoggerFactory.getLogger(ApexGroupByKeyOperator.class); + private boolean traceTuples = true; + @Bind(JavaSerializer.class) private WindowingStrategy<V, BoundedWindow> windowingStrategy; @Bind(JavaSerializer.class) + private Coder<K> keyCoder; + @Bind(JavaSerializer.class) private Coder<V> valueCoder; @Bind(JavaSerializer.class) private final SerializablePipelineOptions serializedOptions; @Bind(JavaSerializer.class) - private Map<K, StateInternals<K>> perKeyStateInternals = new HashMap<>(); - private Map<K, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); +// TODO: InMemoryStateInternals not serializable +transient private Map<ByteBuffer, StateInternals<K>> perKeyStateInternals = new HashMap<>(); + private Map<ByteBuffer, Set<TimerInternals.TimerData>> activeTimers = new HashMap<>(); private transient ProcessContext context; private transient OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> fn; @@ -100,14 +111,19 @@ public class ApexGroupByKeyOperator<K, V> implements Operator @Override public void process(ApexStreamTuple<WindowedValue<KV<K, V>>> t) { - //System.out.println("\n***RECEIVED: " +t); try { if (t instanceof ApexStreamTuple.WatermarkTuple) { ApexStreamTuple.WatermarkTuple<?> mark = (ApexStreamTuple.WatermarkTuple<?>)t; processWatermark(mark); + if (traceTuples) { + LOG.debug("\nemitting watermark {}\n", mark.getTimestamp()); + } output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<KV<K, Iterable<V>>>>of(mark.getTimestamp())); return; } + if (traceTuples) { + LOG.debug("\ninput {}\n", t.getValue()); + } processElement(t.getValue()); } catch (Exception e) { Throwables.propagate(e); @@ -124,6 +140,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator Preconditions.checkNotNull(pipelineOptions); this.serializedOptions = new SerializablePipelineOptions(pipelineOptions); this.windowingStrategy = (WindowingStrategy<V, BoundedWindow>)input.getWindowingStrategy(); + this.keyCoder = ((KvCoder<K, V>)input.getCoder()).getKeyCoder(); this.valueCoder = ((KvCoder<K, V>)input.getCoder()).getValueCoder(); } @@ -146,6 +163,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator @Override public void setup(OperatorContext context) { + this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(serializedOptions.get(), this); StateInternalsFactory<K> stateInternalsFactory = new GroupByKeyStateInternalsFactory(); this.fn = GroupAlsoByWindowViaWindowSetDoFn.create(this.windowingStrategy, stateInternalsFactory, SystemReduceFn.<K, V, BoundedWindow>buffering(this.valueCoder)); @@ -163,16 +181,16 @@ public class ApexGroupByKeyOperator<K, V> implements Operator * We keep these timers in a Set, so that they are deduplicated, as the same * timer can be registered multiple times. */ - private Multimap<K, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) { + private Multimap<ByteBuffer, TimerInternals.TimerData> getTimersReadyToProcess(long currentWatermark) { // we keep the timers to return in a different list and launch them later // because we cannot prevent a trigger from registering another trigger, // which would lead to concurrent modification exception. - Multimap<K, TimerInternals.TimerData> toFire = HashMultimap.create(); + Multimap<ByteBuffer, TimerInternals.TimerData> toFire = HashMultimap.create(); - Iterator<Map.Entry<K, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator(); + Iterator<Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>>> it = activeTimers.entrySet().iterator(); while (it.hasNext()) { - Map.Entry<K, Set<TimerInternals.TimerData>> keyWithTimers = it.next(); + Map.Entry<ByteBuffer, Set<TimerInternals.TimerData>> keyWithTimers = it.next(); Iterator<TimerInternals.TimerData> timerIt = keyWithTimers.getValue().iterator(); while (timerIt.hasNext()) { @@ -205,44 +223,64 @@ public class ApexGroupByKeyOperator<K, V> implements Operator fn.processElement(context); } - private StateInternals<K> getStateInternalsForKey(K key) { - StateInternals<K> stateInternals = perKeyStateInternals.get(key); + private StateInternals<K> getStateInternalsForKey(K key) + { + final ByteBuffer keyBytes; + try { + keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key)); + } catch (CoderException e) { + throw Throwables.propagate(e); + } + StateInternals<K> stateInternals = perKeyStateInternals.get(keyBytes); if (stateInternals == null) { //Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder(); //OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getOutputTimeFn(); stateInternals = InMemoryStateInternals.forKey(key); - perKeyStateInternals.put(key, stateInternals); + perKeyStateInternals.put(keyBytes, stateInternals); } return stateInternals; } private void registerActiveTimer(K key, TimerInternals.TimerData timer) { - Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key); + final ByteBuffer keyBytes; + try { + keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key)); + } catch (CoderException e) { + throw Throwables.propagate(e); + } + Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes); if (timersForKey == null) { timersForKey = new HashSet<>(); } timersForKey.add(timer); - activeTimers.put(key, timersForKey); + activeTimers.put(keyBytes, timersForKey); } private void unregisterActiveTimer(K key, TimerInternals.TimerData timer) { - Set<TimerInternals.TimerData> timersForKey = activeTimers.get(key); + final ByteBuffer keyBytes; + try { + keyBytes = ByteBuffer.wrap(CoderUtils.encodeToByteArray(keyCoder, key)); + } catch (CoderException e) { + throw Throwables.propagate(e); + } + Set<TimerInternals.TimerData> timersForKey = activeTimers.get(keyBytes); if (timersForKey != null) { timersForKey.remove(timer); if (timersForKey.isEmpty()) { - activeTimers.remove(key); + activeTimers.remove(keyBytes); } else { - activeTimers.put(key, timersForKey); + activeTimers.put(keyBytes, timersForKey); } } } private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) throws Exception { this.inputWatermark = new Instant(mark.getTimestamp()); - Multimap<K, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp()); + Multimap<ByteBuffer, TimerInternals.TimerData> timers = getTimersReadyToProcess(mark.getTimestamp()); if (!timers.isEmpty()) { - for (K key : timers.keySet()) { - KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, timers.get(key)); + for (ByteBuffer keyBytes : timers.keySet()) { + K key = CoderUtils.decodeFromByteArray(keyCoder, keyBytes.array()); + KeyedWorkItem<K, V> kwi = KeyedWorkItems.<K, V>timersWorkItem(key, timers.get(keyBytes)); context.setElement(kwi, getStateInternalsForKey(kwi.key())); fn.processElement(context); } @@ -315,7 +353,9 @@ public class ApexGroupByKeyOperator<K, V> implements Operator @Override public void outputWindowedValue(KV<K, Iterable<V>> output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - System.out.println("\n***EMITTING: " + output + ", timestamp=" + timestamp); + if (traceTuples) { + LOG.debug("\nemitting {} timestamp {}\n", output, timestamp); + } ApexGroupByKeyOperator.this.output.emit(ApexStreamTuple.DataTuple.of(WindowedValue.of(output, timestamp, windows, pane))); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java index d358d14..13a8fc9 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/functions/ApexParDoOperator.java @@ -18,40 +18,58 @@ package org.apache.beam.runners.apex.translators.functions; +import java.util.ArrayList; +import java.util.List; + import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; import org.apache.beam.runners.apex.translators.utils.NoOpStepContext; import org.apache.beam.runners.apex.translators.utils.SerializablePipelineOptions; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; +import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.runners.core.DoFnRunners.OutputManager; +import org.apache.beam.sdk.repackaged.com.google.common.base.Throwables; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Aggregator.AggregatorFactory; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.ExecutionContext; +import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.InMemoryStateInternals; +import org.apache.beam.sdk.util.state.StateInternals; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.common.util.BaseOperator; import com.esotericsoftware.kryo.serializers.FieldSerializer.Bind; +import com.google.common.collect.Iterables; import com.esotericsoftware.kryo.serializers.JavaSerializer; /** * Apex operator for Beam {@link DoFn}. */ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements OutputManager { + private static final Logger LOG = LoggerFactory.getLogger(ApexParDoOperator.class); + private boolean traceTuples = true; private transient final TupleTag<OutputT> mainTag = new TupleTag<OutputT>(); - private transient DoFnRunner<InputT, OutputT> doFnRunner; + private transient PushbackSideInputDoFnRunner<InputT, OutputT> pushbackDoFnRunner; @Bind(JavaSerializer.class) private final SerializablePipelineOptions pipelineOptions; @@ -60,17 +78,28 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements @Bind(JavaSerializer.class) private final WindowingStrategy<?, ?> windowingStrategy; @Bind(JavaSerializer.class) - private final SideInputReader sideInputReader; + List<PCollectionView<?>> sideInputs; +// TODO: not Kryo serializable, integrate codec +//@Bind(JavaSerializer.class) +private transient StateInternals<Void> sideInputStateInternals = InMemoryStateInternals.forKey(null); + private transient SideInputHandler sideInputHandler; + // TODO: not Kryo serializable, integrate codec + private List<WindowedValue<InputT>> pushedBack = new ArrayList<>(); + private LongMin pushedBackWatermark = new LongMin(); + private long currentInputWatermark = Long.MIN_VALUE; + private long currentOutputWatermark = currentInputWatermark; public ApexParDoOperator( ApexPipelineOptions pipelineOptions, OldDoFn<InputT, OutputT> doFn, WindowingStrategy<?, ?> windowingStrategy, - SideInputReader sideInputReader) { + List<PCollectionView<?>> sideInputs + ) + { this.pipelineOptions = new SerializablePipelineOptions(pipelineOptions); this.doFn = doFn; this.windowingStrategy = windowingStrategy; - this.sideInputReader = sideInputReader; + this.sideInputs = sideInputs; } @SuppressWarnings("unused") // for Kryo @@ -78,17 +107,60 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements this(null, null, null, null); } + public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>> input = new DefaultInputPort<ApexStreamTuple<WindowedValue<InputT>>>() { @Override public void process(ApexStreamTuple<WindowedValue<InputT>> t) { if (t instanceof ApexStreamTuple.WatermarkTuple) { - output.emit(t); + processWatermark((ApexStreamTuple.WatermarkTuple<?>)t); } else { - System.out.println("\n" + Thread.currentThread().getName() + "\n" + t.getValue() + "\n"); - doFnRunner.processElement(t.getValue()); + if (traceTuples) { + LOG.debug("\ninput {}\n", t.getValue()); + } + Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(t.getValue()); + for (WindowedValue<InputT> pushedBackValue : justPushedBack) { + pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis()); + pushedBack.add(pushedBackValue); + } + } + } + }; + + @InputPortFieldAnnotation(optional=true) + public final transient DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>> sideInput1 = new DefaultInputPort<ApexStreamTuple<WindowedValue<Iterable<?>>>>() + { + private final int sideInputIndex = 0; + + @Override + public void process(ApexStreamTuple<WindowedValue<Iterable<?>>> t) + { + if (t instanceof ApexStreamTuple.WatermarkTuple) { + // ignore side input watermarks + return; + } + if (traceTuples) { + LOG.debug("\nsideInput {}\n", t.getValue()); } + PCollectionView<?> sideInput = sideInputs.get(sideInputIndex); + sideInputHandler.addSideInputValue(sideInput, t.getValue()); + + List<WindowedValue<InputT>> newPushedBack = new ArrayList<>(); + for (WindowedValue<InputT> elem : pushedBack) { + Iterable<WindowedValue<InputT>> justPushedBack = processElementInReadyWindows(elem); + Iterables.addAll(newPushedBack, justPushedBack); + } + + pushedBack.clear(); + pushedBackWatermark.clear(); + for (WindowedValue<InputT> pushedBackValue : newPushedBack) { + pushedBackWatermark.add(pushedBackValue.getTimestamp().getMillis()); + pushedBack.add(pushedBackValue); + } + + // potentially emit watermark + processWatermark(ApexStreamTuple.WatermarkTuple.of(currentInputWatermark)); } }; @@ -99,27 +171,82 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple) { output.emit(ApexStreamTuple.DataTuple.of(tuple)); + if (traceTuples) { + LOG.debug("\nemitting {}\n", tuple); + } + } + + private Iterable<WindowedValue<InputT>> processElementInReadyWindows(WindowedValue<InputT> elem) { + try { + return pushbackDoFnRunner.processElementInReadyWindows(elem); + } catch (UserCodeException ue) { + if (ue.getCause() instanceof AssertionError) { + ApexRunner.assertionError = (AssertionError)ue.getCause(); + } + throw ue; + } + } + + private void processWatermark(ApexStreamTuple.WatermarkTuple<?> mark) + { + this.currentInputWatermark = mark.getTimestamp(); + + if (sideInputs.isEmpty()) { + if (traceTuples) { + LOG.debug("\nemitting watermark {}\n", mark); + } + output.emit(mark); + return; + } + + long potentialOutputWatermark = + Math.min(pushedBackWatermark.get(), currentInputWatermark); + if (potentialOutputWatermark > currentOutputWatermark) { + currentOutputWatermark = potentialOutputWatermark; + if (traceTuples) { + LOG.debug("\nemitting watermark {}\n", currentOutputWatermark); + } + output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark)); + } } @Override public void setup(OperatorContext context) { - this.doFnRunner = DoFnRunners.simpleRunner(pipelineOptions.get(), + this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this); + SideInputReader sideInputReader = NullSideInputReader.of(sideInputs); + if (!sideInputs.isEmpty()) { + sideInputHandler = new SideInputHandler(sideInputs, sideInputStateInternals); + sideInputReader = sideInputHandler; + } + + DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.createDefault( + pipelineOptions.get(), doFn, sideInputReader, this, mainTag, - TupleTagList.empty().getAll(), + TupleTagList.empty().getAll() /*sideOutputTags*/, new NoOpStepContext(), new NoOpAggregatorFactory(), windowingStrategy ); + + pushbackDoFnRunner = + PushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); + + try { + doFn.setup(); + } catch (Exception e) { + Throwables.propagate(e); + } + } @Override public void beginWindow(long windowId) { - doFnRunner.startBundle(); + pushbackDoFnRunner.startBundle(); /* Collection<Aggregator<?, ?>> aggregators = AggregatorRetriever.getAggregators(doFn); if (!aggregators.isEmpty()) { @@ -131,14 +258,14 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements @Override public void endWindow() { - doFnRunner.finishBundle(); + pushbackDoFnRunner.finishBundle(); } /** * TODO: Placeholder for aggregation, to be implemented for embedded and cluster mode. * It is called from {@link org.apache.beam.sdk.util.SimpleDoFnRunner}. */ - public class NoOpAggregatorFactory implements AggregatorFactory { + public static class NoOpAggregatorFactory implements AggregatorFactory { private NoOpAggregatorFactory() { } @@ -147,31 +274,52 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator implements public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( Class<?> fnClass, ExecutionContext.StepContext step, String name, CombineFn<InputT, AccumT, OutputT> combine) { - return new Aggregator<InputT, OutputT>() { + return new NoOpAggregator<InputT, OutputT>(); + } - @Override - public void addValue(InputT value) - { - } + private static class NoOpAggregator<InputT, OutputT> implements Aggregator<InputT, OutputT>, java.io.Serializable + { + private static final long serialVersionUID = 1L; - @Override - public String getName() - { - // TODO Auto-generated method stub - return null; - } + @Override + public void addValue(InputT value) + { + } + + @Override + public String getName() + { + // TODO Auto-generated method stub + return null; + } + + @Override + public CombineFn<InputT, ?, OutputT> getCombineFn() + { + // TODO Auto-generated method stub + return null; + } + + }; - @Override - public CombineFn<InputT, ?, OutputT> getCombineFn() - { - // TODO Auto-generated method stub - return null; - } - }; - } } + private static class LongMin { + long state = Long.MAX_VALUE; + public void add(long l) { + state = Math.min(state, l); + } + + public long get() { + return state; + } + + public void clear() { + state = Long.MAX_VALUE; + } + + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java index 39114fe..6ee82ea 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java @@ -28,8 +28,11 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.google.common.base.Throwables; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.InputOperator; @@ -40,10 +43,14 @@ import com.esotericsoftware.kryo.serializers.JavaSerializer; import java.io.IOException; /** - * Apex input operator that wraps Beam UnboundedSource. + * Apex input operator that wraps Beam {@link UnboundedSource}. */ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> implements InputOperator { + private static final Logger LOG = LoggerFactory.getLogger( + ApexReadUnboundedInputOperator.class); + private boolean traceTuples = false; + private long outputWatermark = 0; @Bind(JavaSerializer.class) private final SerializablePipelineOptions pipelineOptions; @@ -51,6 +58,7 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb private final UnboundedSource<OutputT, CheckpointMarkT> source; private transient UnboundedSource.UnboundedReader<OutputT> reader; private transient boolean available = false; + @OutputPortFieldAnnotation(optional=true) public final transient DefaultOutputPort<ApexStreamTuple<WindowedValue<OutputT>>> output = new DefaultOutputPort<>(); public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source, ApexPipelineOptions options) { @@ -66,12 +74,23 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb @Override public void beginWindow(long windowId) { - Instant mark = reader.getWatermark(); - output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<OutputT>>of(mark.getMillis())); if (!available && source instanceof ValuesSource) { - // if it's a Create transformation and the input was consumed, + // if it's a Create and the input was consumed, emit final watermark + emitWatermarkIfNecessary(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis()); // terminate the stream (allows tests to finish faster) BaseOperator.shutdown(); + } else { + emitWatermarkIfNecessary(reader.getWatermark().getMillis()); + } + } + + private void emitWatermarkIfNecessary(long mark) { + if (mark > outputWatermark) { + outputWatermark = mark; + if (traceTuples) { + LOG.debug("\nemitting watermark {}\n", mark); + } + output.emit(ApexStreamTuple.WatermarkTuple.<WindowedValue<OutputT>>of(mark)); } } @@ -83,6 +102,7 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb @Override public void setup(OperatorContext context) { + this.traceTuples = ApexStreamTuple.Logging.isDebugEnabled(pipelineOptions.get(), this); try { reader = source.createReader(this.pipelineOptions.get(), null); available = reader.start(); @@ -114,6 +134,9 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT extends Unb OutputT data = reader.getCurrent(); Instant timestamp = reader.getCurrentTimestamp(); available = reader.advance(); + if (traceTuples) { + LOG.debug("\nemitting {}\n", data); + } output.emit(DataTuple.of(WindowedValue.of( data, timestamp, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING))); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java index efb69ee..06940aa 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/ApexStreamTuple.java @@ -27,10 +27,13 @@ import java.io.OutputStream; import java.util.Arrays; import java.util.List; +import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; import org.apache.beam.sdk.coders.StandardCoder; +import com.datatorrent.api.Operator; + public interface ApexStreamTuple<T> { /** @@ -188,4 +191,12 @@ public interface ApexStreamTuple<T> } + final class Logging + { + public static boolean isDebugEnabled(ApexPipelineOptions options, Operator operator) + { + return options.isTupleTracingEnabled(); + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java deleted file mode 100644 index ffe1a29..0000000 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/utils/NoOpSideInputReader.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.apex.translators.utils; - -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.SideInputReader; -import org.apache.beam.sdk.values.PCollectionView; - -import java.io.Serializable; - -import javax.annotation.Nullable; - -/** - * no-op side input reader. - */ -public class NoOpSideInputReader implements SideInputReader, Serializable { - @Nullable - @Override - public <T> T get(PCollectionView<T> view, BoundedWindow window) { - return null; - } - - @Override - public <T> boolean contains(PCollectionView<T> view) { - return false; - } - - @Override - public boolean isEmpty() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java new file mode 100644 index 0000000..3573d31 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTest.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.runners.apex.examples; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.beam.runners.apex.ApexPipelineOptions; +import org.apache.beam.runners.apex.TestApexRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.io.Read; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.runners.dataflow.TestCountingSource; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * For debugging only. + */ +@Ignore +@RunWith(JUnit4.class) +public class IntTest implements java.io.Serializable +{ + + @Test + public void test() + { + ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class); + options.setTupleTracingEnabled(true); + options.setRunner(TestApexRunner.class); + Pipeline p = Pipeline.create(options); +boolean timeBound = false; + + + TestCountingSource source = new TestCountingSource(Integer.MAX_VALUE).withoutSplitting(); +//List<KV<Integer,Integer>> values = Lists.newArrayList( +// KV.of(0, 99),KV.of(0, 99),KV.of(0, 98)); + +//UnboundedSource<KV<Integer,Integer>, ?> source = new ValuesSource<>(values, +// KvCoder.of(VarIntCoder.of(), VarIntCoder.of())); + + if (true) { + source = source.withDedup(); + } + + PCollection<KV<Integer, Integer>> output = + timeBound + ? p.apply(Read.from(source).withMaxReadTime(Duration.millis(200))) + : p.apply(Read.from(source).withMaxNumRecords(NUM_RECORDS)); + + List<KV<Integer, Integer>> expectedOutput = new ArrayList<>(); + for (int i = 0; i < NUM_RECORDS; i++) { + expectedOutput.add(KV.of(0, i)); + } + + // Because some of the NUM_RECORDS elements read are dupes, the final output + // will only have output from 0 to n where n < NUM_RECORDS. + PAssert.that(output).satisfies(new Checker(true, timeBound)); + + + p.run(); + return; + } + + private static final int NUM_RECORDS = 10; + private static class Checker implements SerializableFunction<Iterable<KV<Integer, Integer>>, Void> + { + private final boolean dedup; + private final boolean timeBound; + + Checker(boolean dedup, boolean timeBound) + { + this.dedup = dedup; + this.timeBound = timeBound; + } + + @Override + public Void apply(Iterable<KV<Integer, Integer>> input) + { + List<Integer> values = new ArrayList<>(); + for (KV<Integer, Integer> kv : input) { + assertEquals(0, (int)kv.getKey()); + values.add(kv.getValue()); + } + if (timeBound) { + assertTrue(values.size() >= 1); + } else if (dedup) { + // Verify that at least some data came through. The chance of 90% of the input + // being duplicates is essentially zero. + assertTrue(values.size() > NUM_RECORDS / 10 && values.size() <= NUM_RECORDS); + } else { + assertEquals(NUM_RECORDS, values.size()); + } + Collections.sort(values); + for (int i = 0; i < values.size(); i++) { + assertEquals(i, (int)values.get(i)); + } + //if (finalizeTracker != null) { + // assertThat(finalizeTracker, containsInAnyOrder(values.size() - 1)); + //} + return null; + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java deleted file mode 100644 index 0ee3442..0000000 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/IntTests.java +++ /dev/null @@ -1,207 +0,0 @@ - /* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.beam.runners.apex.examples; - - - import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; - import static org.hamcrest.Matchers.is; - import static org.junit.Assert.assertThat; - -import org.apache.beam.runners.apex.ApexPipelineOptions; -import org.apache.beam.runners.apex.TestApexRunner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.CountingInput; -import org.apache.beam.sdk.io.CountingInput.UnboundedCountingInput; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.testing.NeedsRunner; - import org.apache.beam.sdk.testing.PAssert; - import org.apache.beam.sdk.testing.RunnableOnService; - import org.apache.beam.sdk.testing.TestPipeline; - import org.apache.beam.sdk.transforms.Count; - import org.apache.beam.sdk.transforms.DoFn; - import org.apache.beam.sdk.transforms.Max; - import org.apache.beam.sdk.transforms.Min; - import org.apache.beam.sdk.transforms.PTransform; - import org.apache.beam.sdk.transforms.ParDo; - import org.apache.beam.sdk.transforms.RemoveDuplicates; - import org.apache.beam.sdk.transforms.SerializableFunction; - import org.apache.beam.sdk.transforms.display.DisplayData; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.PCollection; - import org.joda.time.Duration; - import org.joda.time.Instant; - import org.junit.Test; - import org.junit.experimental.categories.Category; - import org.junit.runner.RunWith; - import org.junit.runners.JUnit4; - - /** - * Tests for {@link CountingInput}. - */ - @RunWith(JUnit4.class) - public class IntTests { - public static void addCountingAsserts(PCollection<Long> input, long numElements) { - // Count == numElements - PAssert.thatSingleton(input.apply("Count", Count.<Long>globally())) - .isEqualTo(numElements); - // Unique count == numElements - PAssert.thatSingleton( - input - .apply(RemoveDuplicates.<Long>create()) - .apply("UniqueCount", Count.<Long>globally())) - .isEqualTo(numElements); - // Min == 0 - PAssert.thatSingleton(input.apply("Min", Min.<Long>globally())).isEqualTo(0L); - // Max == numElements-1 - PAssert.thatSingleton(input.apply("Max", Max.<Long>globally())) - .isEqualTo(numElements - 1); - } - - @Test - @Category(RunnableOnService.class) - public void testBoundedInput() { - //Pipeline p = TestPipeline.create(); - ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class); - options.setRunner(TestApexRunner.class); - Pipeline p = Pipeline.create(options); - - long numElements = 1000; - PCollection<Long> input = p.apply(CountingInput.upTo(numElements)); - - addCountingAsserts(input, numElements); - p.run(); - } - - @Test - public void testBoundedDisplayData() { - PTransform<?, ?> input = CountingInput.upTo(1234); - DisplayData displayData = DisplayData.from(input); - assertThat(displayData, hasDisplayItem("upTo", 1234)); - } - - @Test - @Category(RunnableOnService.class) - public void testUnboundedInput() { - //Pipeline p = TestPipeline.create(); - ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class); - options.setRunner(TestApexRunner.class); - Pipeline p = Pipeline.create(options); - - - long numElements = 1000; - - PCollection<Long> input = p.apply(CountingInput.unbounded().withMaxNumRecords(numElements)); - -// input = input.apply(Window.<Long>into(FixedWindows.of(Duration.standardSeconds(10)))); - - addCountingAsserts(input, numElements); - p.run(); - } - - @Test - @Category(NeedsRunner.class) - public void testUnboundedInputRate() { - Pipeline p = TestPipeline.create(); - long numElements = 5000; - - long elemsPerPeriod = 10L; - Duration periodLength = Duration.millis(8); - PCollection<Long> input = - p.apply( - CountingInput.unbounded() - .withRate(elemsPerPeriod, periodLength) - .withMaxNumRecords(numElements)); - - addCountingAsserts(input, numElements); - long expectedRuntimeMillis = (periodLength.getMillis() * numElements) / elemsPerPeriod; - Instant startTime = Instant.now(); - p.run(); - Instant endTime = Instant.now(); - assertThat(endTime.isAfter(startTime.plus(expectedRuntimeMillis)), is(true)); - } - - private static class ElementValueDiff extends DoFn<Long, Long> { - @ProcessElement - public void processElement(ProcessContext c) throws Exception { - c.output(c.element() - c.timestamp().getMillis()); - } - } - - @Test - @Category(RunnableOnService.class) - public void testUnboundedInputTimestamps() { - Pipeline p = TestPipeline.create(); - long numElements = 1000; - - PCollection<Long> input = - p.apply( - CountingInput.unbounded() - .withTimestampFn(new ValueAsTimestampFn()) - .withMaxNumRecords(numElements)); - addCountingAsserts(input, numElements); - - PCollection<Long> diffs = - input - .apply("TimestampDiff", ParDo.of(new ElementValueDiff())) - .apply("RemoveDuplicateTimestamps", RemoveDuplicates.<Long>create()); - // This assert also confirms that diffs only has one unique value. - PAssert.thatSingleton(diffs).isEqualTo(0L); - - p.run(); - } - - @Test - public void testUnboundedDisplayData() { - Duration maxReadTime = Duration.standardHours(5); - SerializableFunction<Long, Instant> timestampFn = new SerializableFunction<Long, Instant>() { - @Override - public Instant apply(Long input) { - return Instant.now(); - } - }; - - PTransform<?, ?> input = CountingInput.unbounded() - .withMaxNumRecords(1234) - .withMaxReadTime(maxReadTime) - .withTimestampFn(timestampFn); - - DisplayData displayData = DisplayData.from(input); - - assertThat(displayData, hasDisplayItem("maxRecords", 1234)); - assertThat(displayData, hasDisplayItem("maxReadTime", maxReadTime)); - assertThat(displayData, hasDisplayItem("timestampFn", timestampFn.getClass())); - } - - /** - * A timestamp function that uses the given value as the timestamp. Because the input values will - * not wrap, this function is non-decreasing and meets the timestamp function criteria laid out - * in {@link UnboundedCountingInput#withTimestampFn(SerializableFunction)}. - */ - private static class ValueAsTimestampFn implements SerializableFunction<Long, Instant> { - @Override - public Instant apply(Long input) { - return new Instant(input); - } - } - - - - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java index 06aaf55..6239021 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java @@ -20,20 +20,25 @@ package org.apache.beam.runners.apex.translators; import org.apache.beam.runners.apex.ApexPipelineOptions; import org.apache.beam.runners.apex.ApexRunnerResult; +import org.apache.beam.runners.apex.TestApexRunner; import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.translators.functions.ApexParDoOperator; import org.apache.beam.runners.apex.translators.io.ApexReadUnboundedInputOperator; +import org.apache.beam.runners.apex.translators.utils.ApexStreamTuple; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import com.datatorrent.api.DAG; +import com.datatorrent.lib.util.KryoCloneUtils; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -48,6 +53,7 @@ import org.slf4j.LoggerFactory; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.regex.Pattern; @@ -123,13 +129,11 @@ public class ParDoBoundTranslatorTest { } } - - @Ignore @Test public void testAssertionFailure() throws Exception { ApexPipelineOptions options = PipelineOptionsFactory.create() .as(ApexPipelineOptions.class); - options.setRunner(ApexRunner.class); + options.setRunner(TestApexRunner.class); Pipeline pipeline = Pipeline.create(options); PCollection<Integer> pcollection = pipeline @@ -149,6 +153,16 @@ public class ParDoBoundTranslatorTest { expectedPattern.matcher(exc.getMessage()).find()); } + @Test + public void testContainsInAnyOrder() throws Exception { + ApexPipelineOptions options = PipelineOptionsFactory.create().as(ApexPipelineOptions.class); + options.setRunner(TestApexRunner.class); + Pipeline pipeline = Pipeline.create(options); + PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4)); + PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3); + pipeline.run(); + } + private static Throwable runExpectingAssertionFailure(Pipeline pipeline) { // We cannot use thrown.expect(AssertionError.class) because the AssertionError // is first caught by JUnit and causes a test failure. @@ -161,4 +175,19 @@ public class ParDoBoundTranslatorTest { throw new RuntimeException("unreachable"); } + @Test + public void testSerialization() throws Exception { + ApexPipelineOptions options = PipelineOptionsFactory.create() + .as(ApexPipelineOptions.class); + ApexParDoOperator<Integer, Integer> operator = new ApexParDoOperator<>(options, + new Add(0), WindowingStrategy.globalDefault(), Collections.<PCollectionView<?>> emptyList()); + operator.setup(null); + operator.beginWindow(0); + WindowedValue<Integer> wv = WindowedValue.valueInGlobalWindow(0); + operator.input.process(ApexStreamTuple.DataTuple.of(wv)); + operator.input.process(ApexStreamTuple.WatermarkTuple.<WindowedValue<Integer>>of(0)); + operator.endWindow(); + Assert.assertNotNull("Serialization", KryoCloneUtils.cloneObject(operator)); + + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09754942/runners/apex/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/resources/log4j.properties b/runners/apex/src/test/resources/log4j.properties index 84a6f68..c0efc5d 100644 --- a/runners/apex/src/test/resources/log4j.properties +++ b/runners/apex/src/test/resources/log4j.properties @@ -26,8 +26,8 @@ log4j.appender.testlogger.target = System.err log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n -log4j.logger.org=info -#log4j.logger.org.apache.commons.beanutils=warn +log4j.logger.org=debug +log4j.logger.org.apache.commons.beanutils=warn log4j.logger.com.datatorrent=info log4j.logger.org.apache.apex=debug log4j.logger.org.apache.beam.runners.apex=debug