Remove Orderdness of Input, Output expansions This brings the PInput and POutput expansion signatures back in line with the Runner API.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0e5737fd Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0e5737fd Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0e5737fd Branch: refs/heads/master Commit: 0e5737fdbee5478ee7f39c4b1a1ac95353ec7b08 Parents: a5a5bf9 Author: Thomas Groh <tg...@google.com> Authored: Tue Apr 4 16:51:55 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Tue Apr 11 12:40:46 2017 -0700 ---------------------------------------------------------------------- .../FlattenPCollectionTranslator.java | 13 +- .../apex/translation/ParDoTranslator.java | 13 +- .../apex/translation/TranslationContext.java | 10 +- .../DeduplicatedFlattenFactory.java | 18 ++- .../EmptyFlattenAsCreateFactory.java | 7 +- .../core/construction/PTransformMatchers.java | 5 +- .../core/construction/PrimitiveCreate.java | 7 +- .../core/construction/ReplacementOutputs.java | 63 ++++----- .../SingleInputOutputOverrideFactory.java | 11 +- .../UnsupportedOverrideFactory.java | 8 +- .../DeduplicatedFlattenFactoryTest.java | 6 +- .../EmptyFlattenAsCreateFactoryTest.java | 8 +- .../construction/PTransformMatchersTest.java | 132 +++++++++---------- .../construction/ReplacementOutputsTest.java | 109 ++------------- .../SingleInputOutputOverrideFactoryTest.java | 6 +- .../UnsupportedOverrideFactoryTest.java | 7 +- .../direct/BoundedReadEvaluatorFactory.java | 2 +- .../beam/runners/direct/DirectGraphVisitor.java | 5 +- .../direct/ExecutorServiceParallelExecutor.java | 4 +- .../runners/direct/FlattenEvaluatorFactory.java | 2 +- .../GroupAlsoByWindowEvaluatorFactory.java | 2 +- .../direct/GroupByKeyOnlyEvaluatorFactory.java | 4 +- .../direct/KeyedPValueTrackingVisitor.java | 14 +- .../runners/direct/ParDoEvaluatorFactory.java | 10 +- .../direct/ParDoMultiOverrideFactory.java | 9 +- .../direct/StatefulParDoEvaluatorFactory.java | 8 +- .../direct/TestStreamEvaluatorFactory.java | 8 +- .../direct/UnboundedReadEvaluatorFactory.java | 4 +- .../runners/direct/ViewEvaluatorFactory.java | 4 +- .../runners/direct/ViewOverrideFactory.java | 9 +- .../beam/runners/direct/WatermarkManager.java | 19 +-- .../runners/direct/WindowEvaluatorFactory.java | 2 +- .../direct/WriteWithShardingFactory.java | 10 +- .../runners/direct/DirectGraphVisitorTest.java | 7 +- .../beam/runners/direct/ParDoEvaluatorTest.java | 2 +- .../StatefulParDoEvaluatorFactoryTest.java | 2 +- .../direct/TestStreamEvaluatorFactoryTest.java | 5 +- .../runners/direct/ViewOverrideFactoryTest.java | 2 +- .../flink/FlinkBatchTransformTranslators.java | 36 ++--- .../flink/FlinkBatchTranslationContext.java | 11 +- .../flink/FlinkStreamingPipelineTranslator.java | 9 +- .../FlinkStreamingTransformTranslators.java | 32 ++--- .../flink/FlinkStreamingTranslationContext.java | 12 +- .../dataflow/BatchStatefulParDoOverrides.java | 15 +-- .../dataflow/DataflowPipelineTranslator.java | 20 +-- .../beam/runners/dataflow/DataflowRunner.java | 27 ++-- .../runners/dataflow/TransformTranslator.java | 6 +- .../dataflow/DataflowPipelineJobTest.java | 7 +- .../apache/beam/runners/spark/SparkRunner.java | 20 +-- .../beam/runners/spark/TestSparkRunner.java | 7 +- .../spark/translation/EvaluationContext.java | 11 +- .../spark/translation/TransformTranslator.java | 25 ++-- .../streaming/StreamingTransformTranslator.java | 20 +-- .../sdk/runners/PTransformOverrideFactory.java | 10 +- .../beam/sdk/runners/TransformHierarchy.java | 80 +++++------ .../beam/sdk/transforms/AppliedPTransform.java | 17 ++- .../transforms/join/KeyedPCollectionTuple.java | 12 +- .../java/org/apache/beam/sdk/values/PBegin.java | 6 +- .../apache/beam/sdk/values/PCollectionList.java | 27 ++-- .../beam/sdk/values/PCollectionTuple.java | 10 +- .../java/org/apache/beam/sdk/values/PDone.java | 6 +- .../java/org/apache/beam/sdk/values/PInput.java | 4 +- .../org/apache/beam/sdk/values/POutput.java | 4 +- .../java/org/apache/beam/sdk/values/PValue.java | 4 +- .../org/apache/beam/sdk/values/PValueBase.java | 6 +- .../apache/beam/sdk/values/TaggedPValue.java | 5 + .../java/org/apache/beam/sdk/PipelineTest.java | 20 ++- .../sdk/runners/TransformHierarchyTest.java | 72 +++++----- .../beam/sdk/values/PCollectionListTest.java | 70 ++++------ .../beam/sdk/values/PCollectionTupleTest.java | 5 +- .../beam/sdk/io/gcp/bigquery/WriteResult.java | 11 +- 71 files changed, 526 insertions(+), 658 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java index 080c5e9..440b801 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslator.java @@ -32,7 +32,8 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.UnboundedSource; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; /** * {@link Flatten.PCollections} translation to Apex operator. @@ -63,15 +64,15 @@ class FlattenPCollectionTranslator<T> implements } } - private List<PCollection<T>> extractPCollections(List<TaggedPValue> inputs) { + private List<PCollection<T>> extractPCollections(Map<TupleTag<?>, PValue> inputs) { List<PCollection<T>> collections = Lists.newArrayList(); - for (TaggedPValue pv : inputs) { + for (PValue pv : inputs.values()) { checkArgument( - pv.getValue() instanceof PCollection, + pv instanceof PCollection, "Non-PCollection provided as input to flatten: %s of type %s", - pv.getValue(), + pv, pv.getClass().getSimpleName()); - collections.add((PCollection<T>) pv.getValue()); + collections.add((PCollection<T>) pv); } return collections; } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java index fa9d21d..9213c1f 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/ParDoTranslator.java @@ -27,6 +27,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import org.apache.beam.runners.apex.ApexRunner; import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator; import org.apache.beam.sdk.coders.Coder; @@ -38,7 +39,7 @@ import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.util.WindowedValue.WindowedValueCoder; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,8 +82,8 @@ class ParDoTranslator<InputT, OutputT> ApexRunner.class.getSimpleName())); } - List<TaggedPValue> outputs = context.getOutputs(); - PCollection<InputT> input = context.getInput(); + Map<TupleTag<?>, PValue> outputs = context.getOutputs(); + PCollection<InputT> input = (PCollection<InputT>) context.getInput(); List<PCollectionView<?>> sideInputs = transform.getSideInputs(); Coder<InputT> inputCoder = input.getCoder(); WindowedValueCoder<InputT> wvInputCoder = @@ -100,7 +101,7 @@ class ParDoTranslator<InputT, OutputT> context.getStateBackend()); Map<PCollection<?>, OutputPort<?>> ports = Maps.newHashMapWithExpectedSize(outputs.size()); - for (TaggedPValue output : outputs) { + for (Entry<TupleTag<?>, PValue> output : outputs.entrySet()) { checkArgument( output.getValue() instanceof PCollection, "%s %s outputs non-PCollection %s of type %s", @@ -109,12 +110,12 @@ class ParDoTranslator<InputT, OutputT> output.getValue(), output.getValue().getClass().getSimpleName()); PCollection<?> pc = (PCollection<?>) output.getValue(); - if (output.getTag().equals(transform.getMainOutputTag())) { + if (output.getKey().equals(transform.getMainOutputTag())) { ports.put(pc, operator.output); } else { int portIndex = 0; for (TupleTag<?> tag : transform.getSideOutputTags().getAll()) { - if (tag.equals(output.getTag())) { + if (tag.equals(output.getKey())) { ports.put(pc, operator.sideOutputPorts[portIndex]); break; } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java index 81507ef..c78028e 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java @@ -42,7 +42,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -85,20 +85,20 @@ class TranslationContext { return getCurrentTransform().getFullName(); } - public List<TaggedPValue> getInputs() { + public Map<TupleTag<?>, PValue> getInputs() { return getCurrentTransform().getInputs(); } public <InputT extends PValue> InputT getInput() { - return (InputT) Iterables.getOnlyElement(getCurrentTransform().getInputs()).getValue(); + return (InputT) Iterables.getOnlyElement(getCurrentTransform().getInputs().values()); } - public List<TaggedPValue> getOutputs() { + public Map<TupleTag<?>, PValue> getOutputs() { return getCurrentTransform().getOutputs(); } public <OutputT extends PValue> OutputT getOutput() { - return (OutputT) Iterables.getOnlyElement(getCurrentTransform().getOutputs()).getValue(); + return (OutputT) Iterables.getOnlyElement(getCurrentTransform().getOutputs().values()); } private AppliedPTransform<?, ?, ?> getCurrentTransform() { http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java index 093385e..c12c548 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactory.java @@ -19,7 +19,6 @@ package org.apache.beam.runners.core.construction; import java.util.HashMap; -import java.util.List; import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.PTransformOverrideFactory; @@ -31,7 +30,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; /** * A {@link PTransformOverrideFactory} that will apply a flatten where no element appears in the @@ -46,6 +45,7 @@ public class DeduplicatedFlattenFactory<T> } private DeduplicatedFlattenFactory() {} + @Override public PTransform<PCollectionList<T>, PCollection<T>> getReplacementTransform( PCollections<T> transform) { @@ -75,12 +75,16 @@ public class DeduplicatedFlattenFactory<T> }; } + /** + * {@inheritDoc}. + * + * <p>The input {@link PCollectionList} that is constructed will have the same values in the same + */ @Override - public PCollectionList<T> getInput( - List<TaggedPValue> inputs, Pipeline p) { + public PCollectionList<T> getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) { PCollectionList<T> pCollections = PCollectionList.empty(p); - for (TaggedPValue input : inputs) { - PCollection<T> pcollection = (PCollection<T>) input.getValue(); + for (PValue input : inputs.values()) { + PCollection<T> pcollection = (PCollection<T>) input; pCollections = pCollections.and(pcollection); } return pCollections; @@ -88,7 +92,7 @@ public class DeduplicatedFlattenFactory<T> @Override public Map<PValue, ReplacementOutput> mapOutputs( - List<TaggedPValue> outputs, PCollection<T> newOutput) { + Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) { return ReplacementOutputs.singleton(outputs, newOutput); } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java index 4328cf3..936bc08 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactory.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.core.construction; import static com.google.common.base.Preconditions.checkArgument; -import java.util.List; import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.VoidCoder; @@ -31,7 +30,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; /** * A {@link PTransformOverrideFactory} that provides an empty {@link Create} to replace a {@link @@ -57,7 +56,7 @@ public class EmptyFlattenAsCreateFactory<T> @Override public PCollectionList<T> getInput( - List<TaggedPValue> inputs, Pipeline p) { + Map<TupleTag<?>, PValue> inputs, Pipeline p) { checkArgument( inputs.isEmpty(), "Unexpected nonempty input %s for %s", @@ -68,7 +67,7 @@ public class EmptyFlattenAsCreateFactory<T> @Override public Map<PValue, ReplacementOutput> mapOutputs( - List<TaggedPValue> outputs, PCollection<T> newOutput) { + Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) { return ReplacementOutputs.singleton(outputs, newOutput); } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java index 6437f7e..94ec38c 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java @@ -37,7 +37,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; /** * A {@link PTransformMatcher} that matches {@link PTransform PTransforms} based on the class of the @@ -250,8 +249,8 @@ public class PTransformMatchers { public boolean matches(AppliedPTransform<?, ?, ?> application) { if (application.getTransform() instanceof Flatten.PCollections) { Set<PValue> observed = new HashSet<>(); - for (TaggedPValue pvalue : application.getInputs()) { - boolean firstInstance = observed.add(pvalue.getValue()); + for (PValue pvalue : application.getInputs().values()) { + boolean firstInstance = observed.add(pvalue); if (!firstInstance) { return true; } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java index 7bd38b0..9335f3a 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PrimitiveCreate.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.core.construction; -import java.util.List; import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.PTransformOverrideFactory; @@ -30,7 +29,7 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; /** * An implementation of {@link Create} that returns a primitive {@link PCollection}. @@ -63,13 +62,13 @@ public class PrimitiveCreate<T> extends PTransform<PBegin, PCollection<T>> { } @Override - public PBegin getInput(List<TaggedPValue> inputs, Pipeline p) { + public PBegin getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) { return p.begin(); } @Override public Map<PValue, ReplacementOutput> mapOutputs( - List<TaggedPValue> outputs, PCollection<T> newOutput) { + Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) { return ReplacementOutputs.singleton(outputs, newOutput); } } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java index 11b4449..3d485ae 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ReplacementOutputs.java @@ -21,10 +21,11 @@ import static com.google.common.base.Preconditions.checkArgument; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput; import org.apache.beam.sdk.values.POutput; @@ -39,60 +40,40 @@ public class ReplacementOutputs { private ReplacementOutputs() {} public static Map<PValue, ReplacementOutput> singleton( - List<TaggedPValue> original, PValue replacement) { - TaggedPValue taggedReplacement = Iterables.getOnlyElement(replacement.expand()); - return ImmutableMap.<PValue, ReplacementOutput>builder() - .put( - taggedReplacement.getValue(), - ReplacementOutput.of(Iterables.getOnlyElement(original), taggedReplacement)) - .build(); - } - - public static Map<PValue, ReplacementOutput> ordered( - List<TaggedPValue> original, POutput replacement) { - ImmutableMap.Builder<PValue, ReplacementOutput> result = ImmutableMap.builder(); - List<TaggedPValue> replacements = replacement.expand(); - checkArgument( - original.size() == replacements.size(), - "Original and Replacements must be the same size. Original: %s Replacement: %s", - original.size(), - replacements.size()); - int i = 0; - for (TaggedPValue replacementPvalue : replacements) { - result.put( - replacementPvalue.getValue(), ReplacementOutput.of(original.get(i), replacementPvalue)); - i++; - } - return result.build(); + Map<TupleTag<?>, PValue> original, PValue replacement) { + Entry<TupleTag<?>, PValue> originalElement = Iterables.getOnlyElement(original.entrySet()); + TupleTag<?> replacementTag = Iterables.getOnlyElement(replacement.expand().entrySet()).getKey(); + return Collections.singletonMap( + replacement, + ReplacementOutput.of( + TaggedPValue.of(originalElement.getKey(), originalElement.getValue()), + TaggedPValue.of(replacementTag, replacement))); } public static Map<PValue, ReplacementOutput> tagged( - List<TaggedPValue> original, POutput replacement) { + Map<TupleTag<?>, PValue> original, POutput replacement) { Map<TupleTag<?>, TaggedPValue> originalTags = new HashMap<>(); - for (TaggedPValue value : original) { - TaggedPValue former = originalTags.put(value.getTag(), value); - checkArgument( - former == null || former.equals(value), - "Found two tags in an expanded output which map to different values: output: %s " - + "Values: %s and %s", - original, - former, - value); + for (Map.Entry<TupleTag<?>, PValue> originalValue : original.entrySet()) { + originalTags.put( + originalValue.getKey(), + TaggedPValue.of(originalValue.getKey(), originalValue.getValue())); } ImmutableMap.Builder<PValue, ReplacementOutput> resultBuilder = ImmutableMap.builder(); Set<TupleTag<?>> missingTags = new HashSet<>(originalTags.keySet()); - for (TaggedPValue replacementValue : replacement.expand()) { - TaggedPValue mapped = originalTags.get(replacementValue.getTag()); + for (Map.Entry<TupleTag<?>, PValue> replacementValue : replacement.expand().entrySet()) { + TaggedPValue mapped = originalTags.get(replacementValue.getKey()); checkArgument( mapped != null, "Missing original output for Tag %s and Value %s Between original %s and replacement %s", - replacementValue.getTag(), + replacementValue.getKey(), replacementValue.getValue(), original, replacement.expand()); resultBuilder.put( - replacementValue.getValue(), ReplacementOutput.of(mapped, replacementValue)); - missingTags.remove(replacementValue.getTag()); + replacementValue.getValue(), + ReplacementOutput.of( + mapped, TaggedPValue.of(replacementValue.getKey(), replacementValue.getValue()))); + missingTags.remove(replacementValue.getKey()); } ImmutableMap<PValue, ReplacementOutput> result = resultBuilder.build(); checkArgument( http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java index 43bf556..6d0d571 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactory.java @@ -19,17 +19,16 @@ package org.apache.beam.runners.core.construction; import com.google.common.collect.Iterables; -import java.util.List; import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.PTransformOverrideFactory; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; /** * A {@link PTransformOverrideFactory} which consumes from a {@link PValue} and produces a - * {@link PValue}. {@link #getInput(List, Pipeline)} and {@link #mapOutputs(List, PValue)} are + * {@link PValue}. {@link #getInput(Map, Pipeline)} and {@link #mapOutputs(Map, PValue)} are * implemented. */ public abstract class SingleInputOutputOverrideFactory< @@ -38,13 +37,13 @@ public abstract class SingleInputOutputOverrideFactory< TransformT extends PTransform<InputT, OutputT>> implements PTransformOverrideFactory<InputT, OutputT, TransformT> { @Override - public final InputT getInput(List<TaggedPValue> inputs, Pipeline p) { - return (InputT) Iterables.getOnlyElement(inputs).getValue(); + public final InputT getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) { + return (InputT) Iterables.getOnlyElement(inputs.values()); } @Override public final Map<PValue, ReplacementOutput> mapOutputs( - List<TaggedPValue> outputs, OutputT newOutput) { + Map<TupleTag<?>, PValue> outputs, OutputT newOutput) { return ReplacementOutputs.singleton(outputs, newOutput); } } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactory.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactory.java index 38cbd2a..7b9d704 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactory.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactory.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.core.construction; -import java.util.List; import java.util.Map; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.runners.PTransformOverrideFactory; @@ -26,7 +25,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; /** * A {@link PTransformOverrideFactory} that throws an exception when a call to @@ -60,12 +59,13 @@ public final class UnsupportedOverrideFactory< } @Override - public InputT getInput(List<TaggedPValue> inputs, Pipeline p) { + public InputT getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) { throw new UnsupportedOperationException(message); } @Override - public Map<PValue, ReplacementOutput> mapOutputs(List<TaggedPValue> outputs, OutputT newOutput) { + public Map<PValue, ReplacementOutput> mapOutputs( + Map<TupleTag<?>, PValue> outputs, OutputT newOutput) { throw new UnsupportedOperationException(message); } } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java index a251f5a..14aa1e6 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/DeduplicatedFlattenFactoryTest.java @@ -22,7 +22,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; -import com.google.common.collect.Iterables; import org.apache.beam.sdk.Pipeline.PipelineVisitor.Defaults; import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput; import org.apache.beam.sdk.runners.TransformHierarchy; @@ -35,6 +34,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; import org.hamcrest.Matchers; import org.junit.Rule; import org.junit.Test; @@ -106,7 +106,7 @@ public class DeduplicatedFlattenFactoryTest { Matchers.<PValue, ReplacementOutput>hasEntry( replacement, ReplacementOutput.of( - Iterables.getOnlyElement(original.expand()), - Iterables.getOnlyElement(replacement.expand())))); + TaggedPValue.ofExpandedValue(original), + TaggedPValue.ofExpandedValue(replacement)))); } } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java index ad9d908..90bbee7 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/EmptyFlattenAsCreateFactoryTest.java @@ -21,7 +21,6 @@ package org.apache.beam.runners.core.construction; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; -import com.google.common.collect.Iterables; import java.util.Collections; import java.util.Map; import org.apache.beam.sdk.io.CountingInput; @@ -34,6 +33,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; import org.hamcrest.Matchers; import org.junit.Rule; import org.junit.Test; @@ -55,7 +55,7 @@ public class EmptyFlattenAsCreateFactoryTest { @Test public void getInputEmptySucceeds() { assertThat( - factory.getInput(Collections.<TaggedPValue>emptyList(), pipeline).size(), equalTo(0)); + factory.getInput(Collections.<TupleTag<?>, PValue>emptyMap(), pipeline).size(), equalTo(0)); } @Test @@ -80,8 +80,8 @@ public class EmptyFlattenAsCreateFactoryTest { Matchers.<PValue, ReplacementOutput>hasEntry( replacement, ReplacementOutput.of( - Iterables.getOnlyElement(original.expand()), - Iterables.getOnlyElement(replacement.expand())))); + TaggedPValue.ofExpandedValue(original), + TaggedPValue.ofExpandedValue(replacement)))); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index d053f62..4084cdc 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -24,7 +24,7 @@ import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertThat; import com.google.common.base.MoreObjects; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import java.io.Serializable; import java.util.Collections; import org.apache.beam.sdk.coders.VarIntCoder; @@ -65,7 +65,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.hamcrest.Matchers; @@ -385,17 +385,15 @@ public class PTransformMatchersTest implements Serializable { public void emptyFlattenWithEmptyFlatten() { AppliedPTransform application = AppliedPTransform - .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>> - of( - "EmptyFlatten", - Collections.<TaggedPValue>emptyList(), - Collections.singletonList( - TaggedPValue.of( - new TupleTag<Object>(), - PCollection.createPrimitiveOutputInternal( - p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))), - Flatten.pCollections(), - p); + .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of( + "EmptyFlatten", + Collections.<TupleTag<?>, PValue>emptyMap(), + Collections.<TupleTag<?>, PValue>singletonMap( + new TupleTag<Object>(), + PCollection.createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)), + Flatten.pCollections(), + p); assertThat(PTransformMatchers.emptyFlatten().matches(application), is(true)); } @@ -404,21 +402,18 @@ public class PTransformMatchersTest implements Serializable { public void emptyFlattenWithNonEmptyFlatten() { AppliedPTransform application = AppliedPTransform - .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>> - of( - "Flatten", - Collections.singletonList( - TaggedPValue.of( - new TupleTag<Object>(), - PCollection.createPrimitiveOutputInternal( - p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))), - Collections.singletonList( - TaggedPValue.of( - new TupleTag<Object>(), - PCollection.createPrimitiveOutputInternal( - p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))), - Flatten.pCollections(), - p); + .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of( + "Flatten", + Collections.<TupleTag<?>, PValue>singletonMap( + new TupleTag<Object>(), + PCollection.createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)), + Collections.<TupleTag<?>, PValue>singletonMap( + new TupleTag<Object>(), + PCollection.createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)), + Flatten.pCollections(), + p); assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false)); } @@ -427,18 +422,16 @@ public class PTransformMatchersTest implements Serializable { public void emptyFlattenWithNonFlatten() { AppliedPTransform application = AppliedPTransform - .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>> - of( - "EmptyFlatten", - Collections.<TaggedPValue>emptyList(), - Collections.singletonList( - TaggedPValue.of( - new TupleTag<Object>(), - PCollection.createPrimitiveOutputInternal( - p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))), - Flatten.iterables() /* This isn't actually possible to construct, + .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>of( + "EmptyFlatten", + Collections.<TupleTag<?>, PValue>emptyMap(), + Collections.<TupleTag<?>, PValue>singletonMap( + new TupleTag<Object>(), + PCollection.createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)), + Flatten.iterables() /* This isn't actually possible to construct, * but for the sake of example */, - p); + p); assertThat(PTransformMatchers.emptyFlatten().matches(application), is(false)); } @@ -447,19 +440,16 @@ public class PTransformMatchersTest implements Serializable { public void flattenWithDuplicateInputsWithoutDuplicates() { AppliedPTransform application = AppliedPTransform - .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>> - of( + .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of( "Flatten", - Collections.singletonList( - TaggedPValue.of( - new TupleTag<Object>(), - PCollection.createPrimitiveOutputInternal( - p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))), - Collections.singletonList( - TaggedPValue.of( - new TupleTag<Object>(), - PCollection.createPrimitiveOutputInternal( - p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))), + Collections.<TupleTag<?>, PValue>singletonMap( + new TupleTag<Object>(), + PCollection.createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)), + Collections.<TupleTag<?>, PValue>singletonMap( + new TupleTag<Object>(), + PCollection.createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)), Flatten.pCollections(), p); @@ -468,21 +458,21 @@ public class PTransformMatchersTest implements Serializable { @Test public void flattenWithDuplicateInputsWithDuplicates() { - PCollection<Object> duplicate = PCollection.createPrimitiveOutputInternal(p, - WindowingStrategy.globalDefault(), - IsBounded.BOUNDED); + PCollection<Object> duplicate = + PCollection.createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED); AppliedPTransform application = AppliedPTransform .<PCollectionList<Object>, PCollection<Object>, Flatten.PCollections<Object>>of( "Flatten", - ImmutableList.of( - TaggedPValue.of(new TupleTag<Object>(), duplicate), - TaggedPValue.of(new TupleTag<Object>(), duplicate)), - Collections.singletonList( - TaggedPValue.of( - new TupleTag<Object>(), - PCollection.createPrimitiveOutputInternal( - p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))), + ImmutableMap.<TupleTag<?>, PValue>builder() + .put(new TupleTag<Object>(), duplicate) + .put(new TupleTag<Object>(), duplicate) + .build(), + Collections.<TupleTag<?>, PValue>singletonMap( + new TupleTag<Object>(), + PCollection.createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)), Flatten.pCollections(), p); @@ -493,15 +483,13 @@ public class PTransformMatchersTest implements Serializable { public void flattenWithDuplicateInputsNonFlatten() { AppliedPTransform application = AppliedPTransform - .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>> - of( + .<PCollection<Iterable<Object>>, PCollection<Object>, Flatten.Iterables<Object>>of( "EmptyFlatten", - Collections.<TaggedPValue>emptyList(), - Collections.singletonList( - TaggedPValue.of( - new TupleTag<Object>(), - PCollection.createPrimitiveOutputInternal( - p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED))), + Collections.<TupleTag<?>, PValue>emptyMap(), + Collections.<TupleTag<?>, PValue>singletonMap( + new TupleTag<Object>(), + PCollection.createPrimitiveOutputInternal( + p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED)), Flatten.iterables() /* This isn't actually possible to construct, * but for the sake of example */, p); @@ -541,8 +529,8 @@ public class PTransformMatchersTest implements Serializable { private AppliedPTransform<?, ?, ?> appliedWrite(Write<Integer> write) { return AppliedPTransform.<PCollection<Integer>, PDone, Write<Integer>>of( "Write", - Collections.<TaggedPValue>emptyList(), - Collections.<TaggedPValue>emptyList(), + Collections.<TupleTag<?>, PValue>emptyMap(), + Collections.<TupleTag<?>, PValue>emptyMap(), write, p); } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java index abfdeef..00c436d 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ReplacementOutputsTest.java @@ -21,18 +21,15 @@ package org.apache.beam.runners.core.construction; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertThat; -import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; -import java.util.List; import java.util.Map; import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; -import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TaggedPValue; import org.apache.beam.sdk.values.TupleTag; @@ -79,8 +76,10 @@ public class ReplacementOutputsTest { assertThat(replacements, Matchers.<PValue>hasKey(replacementInts)); ReplacementOutput replacement = replacements.get(replacementInts); - TaggedPValue taggedInts = Iterables.getOnlyElement(ints.expand()); - assertThat(replacement.getOriginal(), equalTo(taggedInts)); + Map.Entry<TupleTag<?>, PValue> taggedInts = Iterables.getOnlyElement(ints.expand().entrySet()); + assertThat( + replacement.getOriginal().getTag(), Matchers.<TupleTag<?>>equalTo(taggedInts.getKey())); + assertThat(replacement.getOriginal().getValue(), equalTo(taggedInts.getValue())); assertThat(replacement.getReplacement().getValue(), Matchers.<PValue>equalTo(replacementInts)); } @@ -88,44 +87,11 @@ public class ReplacementOutputsTest { public void singletonMultipleOriginalsThrows() { thrown.expect(IllegalArgumentException.class); ReplacementOutputs.singleton( - ImmutableList.copyOf(Iterables.concat(ints.expand(), moreInts.expand())), replacementInts); - } - - @Test - public void orderedSucceeds() { - List<TaggedPValue> originals = PCollectionList.of(ints).and(moreInts).expand(); - Map<PValue, ReplacementOutput> replacements = - ReplacementOutputs.ordered( - originals, PCollectionList.of(replacementInts).and(moreReplacementInts)); - assertThat( - replacements.keySet(), - Matchers.<PValue>containsInAnyOrder(replacementInts, moreReplacementInts)); - - ReplacementOutput intsMapping = replacements.get(replacementInts); - assertThat(intsMapping.getOriginal().getValue(), Matchers.<PValue>equalTo(ints)); - assertThat(intsMapping.getReplacement().getValue(), Matchers.<PValue>equalTo(replacementInts)); - - ReplacementOutput moreIntsMapping = replacements.get(moreReplacementInts); - assertThat(moreIntsMapping.getOriginal().getValue(), Matchers.<PValue>equalTo(moreInts)); - assertThat( - moreIntsMapping.getReplacement().getValue(), Matchers.<PValue>equalTo(moreReplacementInts)); - } - - @Test - public void orderedTooManyReplacements() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("same size"); - ReplacementOutputs.ordered( - PCollectionList.of(ints).expand(), - PCollectionList.of(replacementInts).and(moreReplacementInts)); - } - - @Test - public void orderedTooFewReplacements() { - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("same size"); - ReplacementOutputs.ordered( - PCollectionList.of(ints).and(moreInts).expand(), PCollectionList.of(moreReplacementInts)); + ImmutableMap.<TupleTag<?>, PValue>builder() + .putAll(ints.expand()) + .putAll(moreInts.expand()) + .build(), + replacementInts); } private TupleTag<Integer> intsTag = new TupleTag<>(); @@ -168,61 +134,6 @@ public class ReplacementOutputsTest { TaggedPValue.of(moreIntsTag, moreReplacementInts)))); } - /** - * When a call to {@link ReplacementOutputs#tagged(List, POutput)} is made where the first - * argument contains multiple copies of the same {@link TaggedPValue}, the call succeeds using - * that mapping. - */ - @Test - public void taggedMultipleInstances() { - List<TaggedPValue> original = - ImmutableList.of( - TaggedPValue.of(intsTag, ints), - TaggedPValue.of(strsTag, strs), - TaggedPValue.of(intsTag, ints)); - - Map<PValue, ReplacementOutput> replacements = - ReplacementOutputs.tagged( - original, PCollectionTuple.of(strsTag, replacementStrs).and(intsTag, replacementInts)); - assertThat( - replacements.keySet(), - Matchers.<PValue>containsInAnyOrder(replacementStrs, replacementInts)); - ReplacementOutput intsReplacement = replacements.get(replacementInts); - ReplacementOutput strsReplacement = replacements.get(replacementStrs); - - assertThat( - intsReplacement, - equalTo( - ReplacementOutput.of( - TaggedPValue.of(intsTag, ints), TaggedPValue.of(intsTag, replacementInts)))); - assertThat( - strsReplacement, - equalTo( - ReplacementOutput.of( - TaggedPValue.of(strsTag, strs), TaggedPValue.of(strsTag, replacementStrs)))); - } - - /** - * When a call to {@link ReplacementOutputs#tagged(List, POutput)} is made where a single tag - * has multiple {@link PValue PValues} mapped to it, the call fails. - */ - @Test - public void taggedMultipleConflictingInstancesThrows() { - List<TaggedPValue> original = - ImmutableList.of( - TaggedPValue.of(intsTag, ints), TaggedPValue.of(intsTag, moreReplacementInts)); - thrown.expect(IllegalArgumentException.class); - thrown.expectMessage("different values"); - thrown.expectMessage(intsTag.toString()); - thrown.expectMessage(ints.toString()); - thrown.expectMessage(moreReplacementInts.toString()); - ReplacementOutputs.tagged( - original, - PCollectionTuple.of(strsTag, replacementStrs) - .and(moreIntsTag, moreReplacementInts) - .and(intsTag, replacementInts)); - } - @Test public void taggedMissingReplacementThrows() { PCollectionTuple original = http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java index b4cdd1f..07352f5 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/SingleInputOutputOverrideFactoryTest.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.core.construction; import static org.junit.Assert.assertThat; -import com.google.common.collect.Iterables; import java.io.Serializable; import java.util.Map; import org.apache.beam.sdk.runners.PTransformOverrideFactory.ReplacementOutput; @@ -32,6 +31,7 @@ import org.apache.beam.sdk.transforms.SimpleFunction; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TaggedPValue; import org.hamcrest.Matchers; import org.junit.Rule; import org.junit.Test; @@ -97,8 +97,8 @@ public class SingleInputOutputOverrideFactoryTest implements Serializable { Matchers.<PValue, ReplacementOutput>hasEntry( reappliedOutput, ReplacementOutput.of( - Iterables.getOnlyElement(output.expand()), - Iterables.getOnlyElement(reappliedOutput.expand())))); + TaggedPValue.ofExpandedValue(output), + TaggedPValue.ofExpandedValue(reappliedOutput)))); } @Test http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactoryTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactoryTest.java index f33d0f9..81ce00d 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactoryTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/UnsupportedOverrideFactoryTest.java @@ -23,7 +23,8 @@ import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -53,13 +54,13 @@ public class UnsupportedOverrideFactoryTest { public void getInputThrows() { thrown.expect(UnsupportedOperationException.class); thrown.expectMessage(message); - factory.getInput(Collections.<TaggedPValue>emptyList(), pipeline); + factory.getInput(Collections.<TupleTag<?>, PValue>emptyMap(), pipeline); } @Test public void mapOutputThrows() { thrown.expect(UnsupportedOperationException.class); thrown.expectMessage(message); - factory.mapOutputs(Collections.<TaggedPValue>emptyList(), PDone.in(pipeline)); + factory.mapOutputs(Collections.<TupleTag<?>, PValue>emptyMap(), PDone.in(pipeline)); } } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java index 57735e7..5bd6f7e 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java @@ -117,7 +117,7 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory { ExecutorService executor) { this.evaluationContext = evaluationContext; this.outputPCollection = - (PCollection<OutputT>) Iterables.getOnlyElement(transform.getOutputs()).getValue(); + (PCollection<OutputT>) Iterables.getOnlyElement(transform.getOutputs().values()); this.resultBuilder = StepTransformResult.withoutHold(transform); this.minimumDynamicSplitSize = minimumDynamicSplitSize; this.produceSplitExecutor = executor; http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java index 7e6845d..c342136 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectGraphVisitor.java @@ -34,7 +34,6 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; /** * Tracks the {@link AppliedPTransform AppliedPTransforms} that consume each {@link PValue} in the @@ -83,8 +82,8 @@ class DirectGraphVisitor extends PipelineVisitor.Defaults { if (node.getInputs().isEmpty()) { rootTransforms.add(appliedTransform); } else { - for (TaggedPValue value : node.getInputs()) { - primitiveConsumers.put(value.getValue(), appliedTransform); + for (PValue value : node.getInputs().values()) { + primitiveConsumers.put(value, appliedTransform); } } } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 8b9f995..c802c58 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -534,8 +534,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor { .createKeyedBundle( transformTimers.getKey(), (PCollection) - Iterables.getOnlyElement(transformTimers.getTransform().getInputs()) - .getValue()) + Iterables.getOnlyElement( + transformTimers.getTransform().getInputs().values())) .add(WindowedValue.valueInGlobalWindow(work)) .commit(evaluationContext.now()); scheduleConsumption( http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java index 8528905..7c6d2a1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/FlattenEvaluatorFactory.java @@ -57,7 +57,7 @@ class FlattenEvaluatorFactory implements TransformEvaluatorFactory { application) { final UncommittedBundle<InputT> outputBundle = evaluationContext.createBundle( - (PCollection<InputT>) Iterables.getOnlyElement(application.getOutputs()).getValue()); + (PCollection<InputT>) Iterables.getOnlyElement(application.getOutputs().values())); final TransformResult<InputT> result = StepTransformResult.<InputT>withoutHold(application).addOutput(outputBundle).build(); return new FlattenEvaluator<>(outputBundle, result); http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 04e5aaa..f7fd4cf 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -162,7 +162,7 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { evaluationContext.createKeyedBundle( structuralKey, (PCollection<KV<K, Iterable<V>>>) - Iterables.getOnlyElement(application.getOutputs()).getValue()); + Iterables.getOnlyElement(application.getOutputs().values())); outputBundles.add(bundle); CopyOnAccessInMemoryStateInternals<K> stateInternals = (CopyOnAccessInMemoryStateInternals<K>) stepContext.stateInternals(); http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java index ef96a92..ac0b14f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupByKeyOnlyEvaluatorFactory.java @@ -105,7 +105,7 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory { this.application = application; this.keyCoder = getKeyCoder( - ((PCollection<KV<K, V>>) Iterables.getOnlyElement(application.getInputs()).getValue()) + ((PCollection<KV<K, V>>) Iterables.getOnlyElement(application.getInputs().values())) .getCoder()); this.groupingMap = new HashMap<>(); } @@ -158,7 +158,7 @@ class GroupByKeyOnlyEvaluatorFactory implements TransformEvaluatorFactory { evaluationContext.createKeyedBundle( StructuralKey.of(key, keyCoder), (PCollection<KeyedWorkItem<K, V>>) - Iterables.getOnlyElement(application.getOutputs()).getValue()); + Iterables.getOnlyElement(application.getOutputs().values())); bundle.add(WindowedValue.valueInGlobalWindow(groupedKv)); resultBuilder.addOutput(bundle); } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java index 02b1bed..f9b6eba 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/KeyedPValueTrackingVisitor.java @@ -21,7 +21,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.common.collect.ImmutableSet; import java.util.HashSet; -import java.util.List; +import java.util.Map; import java.util.Set; import org.apache.beam.runners.core.SplittableParDo; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; @@ -32,7 +32,7 @@ import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; /** * A pipeline visitor that tracks all keyed {@link PValue PValues}. A {@link PValue} is keyed if it @@ -83,9 +83,9 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor { if (node.isRootNode()) { finalized = true; } else if (PRODUCES_KEYED_OUTPUTS.contains(node.getTransform().getClass())) { - List<TaggedPValue> outputs = node.getOutputs(); - for (TaggedPValue output : outputs) { - keyedValues.add(output.getValue()); + Map<TupleTag<?>, PValue> outputs = node.getOutputs(); + for (PValue output : outputs.values()) { + keyedValues.add(output); } } } @@ -96,8 +96,8 @@ class KeyedPValueTrackingVisitor implements PipelineVisitor { @Override public void visitValue(PValue value, TransformHierarchy.Node producer) { boolean inputsAreKeyed = true; - for (TaggedPValue input : producer.getInputs()) { - inputsAreKeyed = inputsAreKeyed && keyedValues.contains(input.getValue()); + for (PValue input : producer.getInputs().values()) { + inputsAreKeyed = inputsAreKeyed && keyedValues.contains(input); } if (PRODUCES_KEYED_OUTPUTS.contains(producer.getTransform().getClass()) || (isKeyPreserving(producer.getTransform()) && inputsAreKeyed)) { http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index b0e97fb..b8a13e2 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -32,7 +32,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -141,7 +141,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator evaluationContext, stepContext, application, - ((PCollection<InputT>) Iterables.getOnlyElement(application.getInputs()).getValue()) + ((PCollection<InputT>) Iterables.getOnlyElement(application.getInputs().values())) .getWindowingStrategy(), fn, key, @@ -162,10 +162,10 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator } } - private Map<TupleTag<?>, PCollection<?>> pcollections(List<TaggedPValue> outputs) { + private Map<TupleTag<?>, PCollection<?>> pcollections(Map<TupleTag<?>, PValue> outputs) { Map<TupleTag<?>, PCollection<?>> pcs = new HashMap<>(); - for (TaggedPValue output : outputs) { - pcs.put(output.getTag(), (PCollection<?>) output.getValue()); + for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) { + pcs.put(output.getKey(), (PCollection<?>) output.getValue()); } return pcs; } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java index 056a0c3..00c0d6a 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoMultiOverrideFactory.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkState; import com.google.common.collect.Iterables; -import java.util.List; import java.util.Map; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItemCoder; @@ -50,7 +49,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.TupleTagList; import org.apache.beam.sdk.values.TypedPValue; @@ -86,13 +85,13 @@ class ParDoMultiOverrideFactory<InputT, OutputT> @Override public PCollection<? extends InputT> getInput( - List<TaggedPValue> inputs, Pipeline p) { - return (PCollection<? extends InputT>) Iterables.getOnlyElement(inputs).getValue(); + Map<TupleTag<?>, PValue> inputs, Pipeline p) { + return (PCollection<? extends InputT>) Iterables.getOnlyElement(inputs.values()); } @Override public Map<PValue, ReplacementOutput> mapOutputs( - List<TaggedPValue> outputs, PCollectionTuple newOutput) { + Map<TupleTag<?>, PValue> outputs, PCollectionTuple newOutput) { return ReplacementOutputs.tagged(outputs, newOutput); } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index 77bebb2..f8fe3d6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -27,6 +27,7 @@ import com.google.common.collect.Lists; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; import org.apache.beam.runners.core.StateNamespace; @@ -52,7 +53,7 @@ import org.apache.beam.sdk.util.state.StateSpec; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; /** A {@link TransformEvaluatorFactory} for stateful {@link ParDo}. */ @@ -139,8 +140,9 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo String stepName = evaluationContext.getStepName(transformOutputWindow.getTransform()); Map<TupleTag<?>, PCollection<?>> taggedValues = new HashMap<>(); - for (TaggedPValue pv : transformOutputWindow.getTransform().getOutputs()) { - taggedValues.put(pv.getTag(), (PCollection<?>) pv.getValue()); + for (Entry<TupleTag<?>, PValue> pv : + transformOutputWindow.getTransform().getOutputs().entrySet()) { + taggedValues.put(pv.getKey(), (PCollection<?>) pv.getValue()); } PCollection<?> pc = taggedValues http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java index 0dd8919..6e0a4fc 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TestStreamEvaluatorFactory.java @@ -48,8 +48,8 @@ import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; import org.apache.beam.sdk.values.TimestampedValue; +import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Duration; import org.joda.time.Instant; @@ -108,7 +108,7 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { if (event.getType().equals(EventType.ELEMENT)) { UncommittedBundle<T> bundle = context.createBundle( - (PCollection<T>) Iterables.getOnlyElement(application.getOutputs()).getValue()); + (PCollection<T>) Iterables.getOnlyElement(application.getOutputs().values())); for (TimestampedValue<T> elem : ((ElementEvent<T>) event).getElements()) { bundle.add( WindowedValue.timestampedValueInGlobalWindow(elem.getValue(), elem.getTimestamp())); @@ -176,13 +176,13 @@ class TestStreamEvaluatorFactory implements TransformEvaluatorFactory { } @Override - public PBegin getInput(List<TaggedPValue> inputs, Pipeline p) { + public PBegin getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) { return p.begin(); } @Override public Map<PValue, ReplacementOutput> mapOutputs( - List<TaggedPValue> outputs, PCollection<T> newOutput) { + Map<TupleTag<?>, PValue> outputs, PCollection<T> newOutput) { return ReplacementOutputs.singleton(outputs, newOutput); } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java index f0eef58..91e7248 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.java @@ -121,7 +121,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { WindowedValue<UnboundedSourceShard<OutputT, CheckpointMarkT>> element) throws IOException { UncommittedBundle<OutputT> output = evaluationContext.createBundle( - (PCollection<OutputT>) getOnlyElement(transform.getOutputs()).getValue()); + (PCollection<OutputT>) getOnlyElement(transform.getOutputs().values())); UnboundedSourceShard<OutputT, CheckpointMarkT> shard = element.getValue(); UnboundedReader<OutputT> reader = null; try { @@ -227,7 +227,7 @@ class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory { // committing the output. if (!reader.getWatermark().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) { PCollection<OutputT> outputPc = - (PCollection<OutputT>) Iterables.getOnlyElement(transform.getOutputs()).getValue(); + (PCollection<OutputT>) Iterables.getOnlyElement(transform.getOutputs().values()); evaluationContext.scheduleAfterOutputWouldBeProduced( outputPc, GlobalWindow.INSTANCE, http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java index dc74d3e..8cbe8fc 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java @@ -64,9 +64,9 @@ class ViewEvaluatorFactory implements TransformEvaluatorFactory { final AppliedPTransform<PCollection<Iterable<InT>>, PCollectionView<OuT>, WriteView<InT, OuT>> application) { PCollection<Iterable<InT>> input = - (PCollection<Iterable<InT>>) Iterables.getOnlyElement(application.getInputs()).getValue(); + (PCollection<Iterable<InT>>) Iterables.getOnlyElement(application.getInputs().values()); final PCollectionViewWriter<InT, OuT> writer = context.createPCollectionViewWriter(input, - (PCollectionView<OuT>) Iterables.getOnlyElement(application.getOutputs()).getValue()); + (PCollectionView<OuT>) Iterables.getOnlyElement(application.getOutputs().values())); return new TransformEvaluator<Iterable<InT>>() { private final List<WindowedValue<InT>> elements = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java index 64e1218..52dc329 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewOverrideFactory.java @@ -20,7 +20,6 @@ package org.apache.beam.runners.direct; import com.google.common.collect.Iterables; import java.util.Collections; -import java.util.List; import java.util.Map; import org.apache.beam.runners.core.construction.ForwardingPTransform; import org.apache.beam.sdk.Pipeline; @@ -35,7 +34,7 @@ import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; /** * A {@link PTransformOverrideFactory} that provides overrides for the {@link CreatePCollectionView} @@ -51,13 +50,13 @@ class ViewOverrideFactory<ElemT, ViewT> } @Override - public PCollection<ElemT> getInput(List<TaggedPValue> inputs, Pipeline p) { - return (PCollection<ElemT>) Iterables.getOnlyElement(inputs).getValue(); + public PCollection<ElemT> getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) { + return (PCollection<ElemT>) Iterables.getOnlyElement(inputs.values()); } @Override public Map<PValue, ReplacementOutput> mapOutputs( - List<TaggedPValue> outputs, PCollectionView<ViewT> newOutput) { + Map<TupleTag<?>, PValue> outputs, PCollectionView<ViewT> newOutput) { return Collections.emptyMap(); } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java index 942e67c..8c04362 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WatermarkManager.java @@ -61,7 +61,8 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.PValue; +import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; /** @@ -818,13 +819,13 @@ public class WatermarkManager { private Collection<Watermark> getInputProcessingWatermarks(AppliedPTransform<?, ?, ?> transform) { ImmutableList.Builder<Watermark> inputWmsBuilder = ImmutableList.builder(); - List<TaggedPValue> inputs = transform.getInputs(); + Map<TupleTag<?>, PValue> inputs = transform.getInputs(); if (inputs.isEmpty()) { inputWmsBuilder.add(THE_END_OF_TIME); } - for (TaggedPValue pvalue : inputs) { + for (PValue pvalue : inputs.values()) { Watermark producerOutputWatermark = - getTransformWatermark(graph.getProducer(pvalue.getValue())) + getTransformWatermark(graph.getProducer(pvalue)) .synchronizedProcessingOutputWatermark; inputWmsBuilder.add(producerOutputWatermark); } @@ -833,13 +834,13 @@ public class WatermarkManager { private List<Watermark> getInputWatermarks(AppliedPTransform<?, ?, ?> transform) { ImmutableList.Builder<Watermark> inputWatermarksBuilder = ImmutableList.builder(); - List<TaggedPValue> inputs = transform.getInputs(); + Map<TupleTag<?>, PValue> inputs = transform.getInputs(); if (inputs.isEmpty()) { inputWatermarksBuilder.add(THE_END_OF_TIME); } - for (TaggedPValue pvalue : inputs) { + for (PValue pvalue : inputs.values()) { Watermark producerOutputWatermark = - getTransformWatermark(graph.getProducer(pvalue.getValue())).outputWatermark; + getTransformWatermark(graph.getProducer(pvalue)).outputWatermark; inputWatermarksBuilder.add(producerOutputWatermark); } List<Watermark> inputCollectionWatermarks = inputWatermarksBuilder.build(); @@ -1023,8 +1024,8 @@ public class WatermarkManager { WatermarkUpdate updateResult = myWatermarks.refresh(); if (updateResult.isAdvanced()) { Set<AppliedPTransform<?, ?, ?>> additionalRefreshes = new HashSet<>(); - for (TaggedPValue outputPValue : toRefresh.getOutputs()) { - additionalRefreshes.addAll(graph.getPrimitiveConsumers(outputPValue.getValue())); + for (PValue outputPValue : toRefresh.getOutputs().values()) { + additionalRefreshes.addAll(graph.getPrimitiveConsumers(outputPValue)); } return additionalRefreshes; } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java index 8974c67..2550924 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java @@ -57,7 +57,7 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory { WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn(); UncommittedBundle<InputT> outputBundle = evaluationContext.createBundle( - (PCollection<InputT>) Iterables.getOnlyElement(transform.getOutputs()).getValue()); + (PCollection<InputT>) Iterables.getOnlyElement(transform.getOutputs().values())); if (fn == null) { return PassthroughTransformEvaluator.create(transform, outputBundle); } http://git-wip-us.apache.org/repos/asf/beam/blob/0e5737fd/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java index 1bf5839..b3f92ab 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java @@ -24,7 +24,6 @@ import com.google.common.base.Suppliers; import com.google.common.collect.Iterables; import java.io.Serializable; import java.util.Collections; -import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.Pipeline; @@ -41,7 +40,7 @@ import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PDone; import org.apache.beam.sdk.values.PValue; -import org.apache.beam.sdk.values.TaggedPValue; +import org.apache.beam.sdk.values.TupleTag; /** * A {@link PTransformOverrideFactory} that overrides {@link Write} {@link PTransform PTransforms} @@ -60,12 +59,13 @@ class WriteWithShardingFactory<InputT> } @Override - public PCollection<InputT> getInput(List<TaggedPValue> inputs, Pipeline p) { - return (PCollection<InputT>) Iterables.getOnlyElement(inputs).getValue(); + public PCollection<InputT> getInput(Map<TupleTag<?>, PValue> inputs, Pipeline p) { + return (PCollection<InputT>) Iterables.getOnlyElement(inputs.values()); } @Override - public Map<PValue, ReplacementOutput> mapOutputs(List<TaggedPValue> outputs, PDone newOutput) { + public Map<PValue, ReplacementOutput> mapOutputs( + Map<TupleTag<?>, PValue> outputs, PDone newOutput) { return Collections.emptyMap(); }