[BEAM-2926] Migrate to using a trivial multimap materialization within the Java SDK.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5e2593da Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5e2593da Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5e2593da Branch: refs/heads/tez-runner Commit: 5e2593daacec83e876b747d56d8c335531a54d1d Parents: 7ce0a82 Author: Luke Cwik <lc...@google.com> Authored: Fri Nov 10 11:08:24 2017 -0800 Committer: Luke Cwik <lc...@google.com> Committed: Wed Nov 15 08:59:51 2017 -0800 ---------------------------------------------------------------------- .../apex/translation/ParDoTranslatorTest.java | 3 +- .../core/construction/PTransformMatchers.java | 3 +- .../core/construction/ParDoTranslation.java | 17 +- .../CreatePCollectionViewTranslationTest.java | 10 +- .../construction/PTransformMatchersTest.java | 33 +- .../core/construction/ParDoTranslationTest.java | 7 +- .../core/InMemoryMultimapSideInputView.java | 62 +++ .../beam/runners/core/SideInputHandler.java | 63 ++-- .../core/InMemoryMultimapSideInputViewTest.java | 53 +++ .../beam/runners/core/SideInputHandlerTest.java | 89 +++-- .../beam/runners/direct/SideInputContainer.java | 38 +- .../runners/direct/EvaluationContextTest.java | 44 ++- .../runners/direct/SideInputContainerTest.java | 226 +++++------ .../direct/ViewEvaluatorFactoryTest.java | 13 +- .../runners/direct/ViewOverrideFactoryTest.java | 9 +- .../direct/WriteWithShardingFactoryTest.java | 9 +- .../FlinkStreamingTransformTranslators.java | 1 - .../functions/FlinkSideInputReader.java | 27 +- .../functions/SideInputInitializer.java | 50 ++- .../flink/streaming/DoFnOperatorTest.java | 40 +- .../DataflowPipelineTranslatorTest.java | 12 +- .../spark/translation/TransformTranslator.java | 7 +- .../spark/util/SparkSideInputReader.java | 50 ++- .../org/apache/beam/sdk/transforms/Combine.java | 13 +- .../apache/beam/sdk/transforms/DoFnTester.java | 20 +- .../beam/sdk/transforms/Materializations.java | 29 +- .../org/apache/beam/sdk/transforms/View.java | 67 +++- .../org/apache/beam/sdk/transforms/ViewFn.java | 6 +- .../apache/beam/sdk/values/PCollectionView.java | 7 +- .../beam/sdk/values/PCollectionViews.java | 256 ++++++------- .../sdk/testing/PCollectionViewTesting.java | 375 +++---------------- .../beam/sdk/transforms/DoFnTesterTest.java | 12 +- .../beam/sdk/io/gcp/bigquery/BatchLoads.java | 14 +- 33 files changed, 809 insertions(+), 856 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java index 73382e3..4a4ca1d 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/ParDoTranslatorTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.apex.translation; +import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -219,7 +220,7 @@ public class ParDoTranslatorTest { operator.beginWindow(0); WindowedValue<Integer> wv1 = WindowedValue.valueInGlobalWindow(1); WindowedValue<Iterable<?>> sideInput = WindowedValue.<Iterable<?>>valueInGlobalWindow( - Lists.<Integer>newArrayList(22)); + materializeValuesFor(View.asSingleton(), 22)); operator.input.process(ApexStreamTuple.DataTuple.of(wv1)); // pushed back input final List<Object> results = Lists.newArrayList(); http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java index 0d27241..42ac73f 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java @@ -36,7 +36,6 @@ import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignature.ProcessElementMethod; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PValue; @@ -304,7 +303,7 @@ public class PTransformMatchers { } CreatePCollectionView<?, ?> createView = (CreatePCollectionView<?, ?>) application.getTransform(); - ViewFn<Iterable<WindowedValue<?>>, ?> viewFn = createView.getView().getViewFn(); + ViewFn<?, ?> viewFn = createView.getView().getViewFn(); return viewFn.getClass().equals(viewFnType); } }; http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java index f88cbe5..e00b912 100644 --- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java +++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/ParDoTranslation.java @@ -50,7 +50,6 @@ import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput; import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput.Builder; import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator; import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.state.StateSpec; import org.apache.beam.sdk.state.StateSpecs; @@ -73,8 +72,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature.TimerDeclaration; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.SerializableUtils; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PValue; @@ -561,25 +558,19 @@ public class ParDoTranslation { ViewFn<?, ?> viewFn = viewFnFromProto(sideInput.getViewFn()); WindowingStrategy<?, ?> windowingStrategy = pCollection.getWindowingStrategy().fixDefaults(); - Coder<Iterable<WindowedValue<?>>> coder = - (Coder) - IterableCoder.of( - FullWindowedValueCoder.of( - pCollection.getCoder(), - pCollection.getWindowingStrategy().getWindowFn().windowCoder())); checkArgument( - sideInput.getAccessPattern().getUrn().equals(Materializations.ITERABLE_MATERIALIZATION_URN), + sideInput.getAccessPattern().getUrn().equals(Materializations.MULTIMAP_MATERIALIZATION_URN), "Unknown View Materialization URN %s", sideInput.getAccessPattern().getUrn()); PCollectionView<?> view = new RunnerPCollectionView<>( pCollection, - (TupleTag<Iterable<WindowedValue<?>>>) tag, - (ViewFn<Iterable<WindowedValue<?>>, ?>) viewFn, + (TupleTag) tag, + (ViewFn) viewFn, windowMappingFn, windowingStrategy, - coder); + (Coder) pCollection.getCoder()); return view; } http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java index df659a8..690e3ca 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/CreatePCollectionViewTranslationTest.java @@ -23,12 +23,14 @@ import static org.junit.Assert.assertThat; import com.google.common.collect.ImmutableList; import org.apache.beam.model.pipeline.v1.RunnerApi.FunctionSpec; import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.runners.AppliedPTransform; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.PCollectionViews; @@ -63,12 +65,11 @@ public class CreatePCollectionViewTranslationTest { testPCollection.getWindowingStrategy(), false, null, - testPCollection.getCoder())), + StringUtf8Coder.of())), CreatePCollectionView.of( PCollectionViews.listView( testPCollection, - testPCollection.getWindowingStrategy(), - testPCollection.getCoder()))); + testPCollection.getWindowingStrategy()))); } @Parameter(0) @@ -76,7 +77,8 @@ public class CreatePCollectionViewTranslationTest { public static TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); - private static final PCollection<String> testPCollection = p.apply(Create.of("one")); + private static final PCollection<KV<Void, String>> testPCollection = + p.apply(Create.of(KV.of((Void) null, "one"))); @Test public void testEncodedProto() throws Exception { http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java index 324e38d..c2dab4c 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java @@ -54,8 +54,6 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.Materialization; -import org.apache.beam.sdk.transforms.Materializations; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.Sum; @@ -67,7 +65,6 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollection.IsBounded; @@ -376,9 +373,8 @@ public class PTransformMatchersTest implements Serializable { @Test public void createViewWithViewFn() { PCollection<Integer> input = p.apply(Create.of(1)); - PCollectionView<Iterable<Integer>> view = - PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder()); - ViewFn<Iterable<WindowedValue<?>>, Iterable<Integer>> viewFn = view.getViewFn(); + PCollectionView<Iterable<Integer>> view = input.apply(View.<Integer>asIterable()); + ViewFn<?, ?> viewFn = view.getViewFn(); CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view); PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass()); @@ -388,23 +384,10 @@ public class PTransformMatchersTest implements Serializable { @Test public void createViewWithViewFnDifferentViewFn() { PCollection<Integer> input = p.apply(Create.of(1)); - PCollectionView<Iterable<Integer>> view = - PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder()); - ViewFn<Iterable<WindowedValue<?>>, Iterable<Integer>> viewFn = - new ViewFn<Iterable<WindowedValue<?>>, Iterable<Integer>>() { - @Override - public Materialization<Iterable<WindowedValue<?>>> getMaterialization() { - @SuppressWarnings({"rawtypes", "unchecked"}) - Materialization<Iterable<WindowedValue<?>>> materialization = - (Materialization) Materializations.iterable(); - return materialization; - } - - @Override - public Iterable<Integer> apply(Iterable<WindowedValue<?>> contents) { - return Collections.emptyList(); - } - }; + PCollectionView<Iterable<Integer>> view = input.apply(View.<Integer>asIterable()); + + // Purposely create a subclass to get a different class then what was expected. + ViewFn<?, ?> viewFn = new PCollectionViews.IterableViewFn() {}; CreatePCollectionView<?, ?> createView = CreatePCollectionView.of(view); PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(viewFn.getClass()); @@ -414,9 +397,7 @@ public class PTransformMatchersTest implements Serializable { @Test public void createViewWithViewFnNotCreatePCollectionView() { PCollection<Integer> input = p.apply(Create.of(1)); - PCollectionView<Iterable<Integer>> view = - PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder()); - + PCollectionView<Iterable<Integer>> view = input.apply(View.<Integer>asIterable()); PTransformMatcher matcher = PTransformMatchers.createViewWithViewFn(view.getViewFn().getClass()); assertThat(matcher.matches(getAppliedTransform(View.asIterable())), is(false)); http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java ---------------------------------------------------------------------- diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java index b79947e..83594f1 100644 --- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java +++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ParDoTranslationTest.java @@ -29,6 +29,7 @@ import org.apache.beam.model.pipeline.v1.RunnerApi; import org.apache.beam.model.pipeline.v1.RunnerApi.ParDoPayload; import org.apache.beam.model.pipeline.v1.RunnerApi.SideInput; import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; @@ -166,7 +167,8 @@ public class ParDoTranslationTest { view.getPCollection(), protoTransform, rehydratedComponents); - assertThat(restoredView.getTagInternal(), equalTo(view.getTagInternal())); + assertThat(restoredView.getTagInternal(), + Matchers.<TupleTag<?>>equalTo(view.getTagInternal())); assertThat(restoredView.getViewFn(), instanceOf(view.getViewFn().getClass())); assertThat( restoredView.getWindowMappingFn(), instanceOf(view.getWindowMappingFn().getClass())); @@ -174,7 +176,8 @@ public class ParDoTranslationTest { restoredView.getWindowingStrategyInternal(), Matchers.<WindowingStrategy<?, ?>>equalTo( view.getWindowingStrategyInternal().fixDefaults())); - assertThat(restoredView.getCoderInternal(), equalTo(view.getCoderInternal())); + assertThat(restoredView.getCoderInternal(), + Matchers.<Coder<?>>equalTo(view.getCoderInternal())); } String mainInputId = sdkComponents.registerPCollection(mainInput); assertThat( http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java new file mode 100644 index 0000000..b451547 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryMultimapSideInputView.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.Multimap; +import com.google.common.collect.Multimaps; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.Materializations; +import org.apache.beam.sdk.transforms.Materializations.MultimapView; +import org.apache.beam.sdk.values.KV; + +/** + * An in-memory representation of {@link MultimapView}. + */ +public class InMemoryMultimapSideInputView<K, V> implements Materializations.MultimapView<K, V> { + + /** + * Creates a {@link MultimapView} from the provided values. The provided {@link Coder} is used + * to guarantee structural equality for keys instead of assuming Java object equality. + */ + public static <K, V> MultimapView<K, V> fromIterable( + Coder<K> keyCoder, Iterable<KV<K, V>> values) { + // We specifically use an array list multimap to allow for: + // * null keys + // * null values + // * duplicate values + Multimap<Object, Object> multimap = ArrayListMultimap.create(); + for (KV<K, V> value : values) { + multimap.put(keyCoder.structuralValue(value.getKey()), value.getValue()); + } + return new InMemoryMultimapSideInputView(keyCoder, Multimaps.unmodifiableMultimap(multimap)); + } + + private final Coder<K> keyCoder; + private final Multimap<Object, V> structuralKeyToValuesMap; + + private InMemoryMultimapSideInputView(Coder<K> keyCoder, Multimap<Object, V> data) { + this.keyCoder = keyCoder; + this.structuralKeyToValuesMap = data; + } + + @Override + public Iterable<V> get(K k) { + return structuralKeyToValuesMap.get(keyCoder.structuralValue(k)); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java index 3b37702..3ff4c94 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java @@ -17,22 +17,28 @@ */ package org.apache.beam.runners.core; -import java.util.ArrayList; +import static com.google.common.base.Preconditions.checkArgument; + import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; import javax.annotation.Nullable; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.SetCoder; import org.apache.beam.sdk.state.CombiningState; import org.apache.beam.sdk.state.ValueState; import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Materializations; +import org.apache.beam.sdk.transforms.Materializations.MultimapView; +import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; /** @@ -58,7 +64,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { /** * State internals that are scoped not to the key of a value but are global. The state can still - * be keep locally but if side inputs are broadcast to all parallel operators then all will + * be kept locally but if side inputs are broadcast to all parallel operators then all will * have the same view of the state. */ private final StateInternals stateInternals; @@ -80,7 +86,7 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { */ private final Map< PCollectionView<?>, - StateTag<ValueState<Iterable<WindowedValue<?>>>>> sideInputContentsTags; + StateTag<ValueState<Iterable<?>>>> sideInputContentsTags; /** * Creates a new {@code SideInputHandler} for the given side inputs that uses @@ -94,7 +100,15 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { this.availableWindowsTags = new HashMap<>(); this.sideInputContentsTags = new HashMap<>(); - for (PCollectionView<?> sideInput: sideInputs) { + for (PCollectionView<?> sideInput : sideInputs) { + checkArgument( + Materializations.MULTIMAP_MATERIALIZATION_URN.equals( + sideInput.getViewFn().getMaterialization().getUrn()), + "This handler is only capable of dealing with %s materializations " + + "but was asked to handle %s for PCollectionView with tag %s.", + Materializations.MULTIMAP_MATERIALIZATION_URN, + sideInput.getViewFn().getMaterialization().getUrn(), + sideInput.getTagInternal().getId()); @SuppressWarnings("unchecked") Coder<BoundedWindow> windowCoder = @@ -114,9 +128,9 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { availableWindowsTags.put(sideInput, availableTag); - Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal(); - StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag = - StateTags.value("side-input-data-" + sideInput.getTagInternal().getId(), coder); + StateTag<ValueState<Iterable<?>>> stateTag = + StateTags.value("side-input-data-" + sideInput.getTagInternal().getId(), + (Coder) IterableCoder.of(sideInput.getCoderInternal())); sideInputContentsTags.put(sideInput, stateTag); } } @@ -129,7 +143,6 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { public void addSideInputValue( PCollectionView<?> sideInput, WindowedValue<Iterable<?>> value) { - @SuppressWarnings("unchecked") Coder<BoundedWindow> windowCoder = (Coder<BoundedWindow>) sideInput @@ -137,19 +150,13 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { .getWindowFn() .windowCoder(); - // reify the WindowedValue - List<WindowedValue<?>> inputWithReifiedWindows = new ArrayList<>(); - for (Object e: value.getValue()) { - inputWithReifiedWindows.add(value.withValue(e)); - } - - StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag = + StateTag<ValueState<Iterable<?>>> stateTag = sideInputContentsTags.get(sideInput); - for (BoundedWindow window: value.getWindows()) { + for (BoundedWindow window : value.getWindows()) { stateInternals .state(StateNamespaces.window(windowCoder, window), stateTag) - .write(inputWithReifiedWindows); + .write(value.getValue()); stateInternals .state(StateNamespaces.global(), availableWindowsTags.get(sideInput)) @@ -159,28 +166,32 @@ public class SideInputHandler implements ReadyCheckingSideInputReader { @Nullable @Override - public <T> T get(PCollectionView<T> sideInput, BoundedWindow window) { - + public <T> T get(PCollectionView<T> view, BoundedWindow window) { @SuppressWarnings("unchecked") Coder<BoundedWindow> windowCoder = - (Coder<BoundedWindow>) sideInput + (Coder<BoundedWindow>) view .getWindowingStrategyInternal() .getWindowFn() .windowCoder(); - StateTag<ValueState<Iterable<WindowedValue<?>>>> stateTag = - sideInputContentsTags.get(sideInput); + StateTag<ValueState<Iterable<?>>> stateTag = + sideInputContentsTags.get(view); - ValueState<Iterable<WindowedValue<?>>> state = + ValueState<Iterable<?>> state = stateInternals.state(StateNamespaces.window(windowCoder, window), stateTag); - @Nullable Iterable<WindowedValue<?>> elements = state.read(); + // TODO: Add support for choosing which representation is contained based upon the + // side input materialization. We currently can assume that we always have a multimap + // materialization as that is the only supported type within the Java SDK. + @Nullable Iterable<KV<?, ?>> elements = (Iterable<KV<?, ?>>) state.read(); if (elements == null) { elements = Collections.emptyList(); } - return sideInput.getViewFn().apply(elements); + ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn(); + Coder<?> keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder(); + return viewFn.apply(InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) elements)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java new file mode 100644 index 0000000..6840355 --- /dev/null +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/InMemoryMultimapSideInputViewTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.ImmutableList; +import org.apache.beam.sdk.coders.ByteArrayCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.Materializations.MultimapView; +import org.apache.beam.sdk.values.KV; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link InMemoryMultimapSideInputView}. */ +@RunWith(JUnit4.class) +public class InMemoryMultimapSideInputViewTest { + @Test + public void testStructuralKeyEquality() { + MultimapView<byte[], Integer> view = InMemoryMultimapSideInputView.fromIterable( + ByteArrayCoder.of(), + ImmutableList.of(KV.of(new byte[]{ 0x00 }, 0), KV.of(new byte[]{ 0x01 }, 1))); + assertEquals(view.get(new byte[]{ 0x00 }), ImmutableList.of(0)); + assertEquals(view.get(new byte[]{ 0x01 }), ImmutableList.of(1)); + assertEquals(view.get(new byte[]{ 0x02 }), ImmutableList.of()); + } + + @Test + public void testValueGrouping() { + MultimapView<String, String> view = InMemoryMultimapSideInputView.fromIterable( + StringUtf8Coder.of(), + ImmutableList.of(KV.of("A", "a1"), KV.of("A", "a2"), KV.of("B", "b1"))); + assertEquals(view.get("A"), ImmutableList.of("a1", "a2")); + assertEquals(view.get("B"), ImmutableList.of("b1")); + assertEquals(view.get("C"), ImmutableList.of()); + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java index f9e0aaf..7cbd1b0 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SideInputHandlerTest.java @@ -17,24 +17,28 @@ */ package org.apache.beam.runners.core; +import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor; import static org.hamcrest.Matchers.contains; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import com.google.common.collect.ImmutableList; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.testing.PCollectionViewTesting; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Duration; import org.joda.time.Instant; -import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -47,26 +51,19 @@ public class SideInputHandlerTest { private static final long WINDOW_MSECS_1 = 100; private static final long WINDOW_MSECS_2 = 500; - - private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 = - WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1))); - - private PCollectionView<Iterable<String>> view1 = - PCollectionViewTesting.testingView( - new TupleTag<Iterable<WindowedValue<String>>>() {}, - new PCollectionViewTesting.IdentityViewFn<String>(), - StringUtf8Coder.of(), - windowingStrategy1); - - private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 = - WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2))); - - private PCollectionView<Iterable<String>> view2 = - PCollectionViewTesting.testingView( - new TupleTag<Iterable<WindowedValue<String>>>() {}, - new PCollectionViewTesting.IdentityViewFn<String>(), - StringUtf8Coder.of(), - windowingStrategy2); + private PCollectionView<Iterable<String>> view1; + private PCollectionView<Iterable<String>> view2; + + @Before + public void setUp() { + PCollection<String> pc = Pipeline.create().apply(Create.of("1")); + view1 = pc + .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_1)))) + .apply(View.<String>asIterable()); + view2 = pc + .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_2)))) + .apply(View.<String>asIterable()); + } @Test public void testIsEmpty() { @@ -113,7 +110,9 @@ public class SideInputHandlerTest { // add a value for view1 sideInputHandler.addSideInputValue( view1, - valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow)); + valuesInWindow( + materializeValuesFor(View.asIterable(), "Hello"), + new Instant(0), firstWindow)); // now side input should be ready assertTrue(sideInputHandler.isReady(view1, firstWindow)); @@ -139,16 +138,20 @@ public class SideInputHandlerTest { // add a first value for view1 sideInputHandler.addSideInputValue( view1, - valuesInWindow(ImmutableList.of("Hello"), new Instant(0), window)); + valuesInWindow( + materializeValuesFor(View.asIterable(), "Hello"), + new Instant(0), window)); - Assert.assertThat(sideInputHandler.get(view1, window), contains("Hello")); + assertThat(sideInputHandler.get(view1, window), contains("Hello")); // subsequent values should replace existing values sideInputHandler.addSideInputValue( view1, - valuesInWindow(ImmutableList.of("Ciao", "Buongiorno"), new Instant(0), window)); + valuesInWindow( + materializeValuesFor(View.asIterable(), "Ciao", "Buongiorno"), + new Instant(0), window)); - Assert.assertThat(sideInputHandler.get(view1, window), contains("Ciao", "Buongiorno")); + assertThat(sideInputHandler.get(view1, window), contains("Ciao", "Buongiorno")); } @Test @@ -166,19 +169,21 @@ public class SideInputHandlerTest { // add a first value for view1 in the first window sideInputHandler.addSideInputValue( view1, - valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow)); + valuesInWindow(materializeValuesFor(View.asIterable(), "Hello"), + new Instant(0), firstWindow)); - Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello")); + assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello")); // add something for second window of view1 sideInputHandler.addSideInputValue( view1, - valuesInWindow(ImmutableList.of("Arrivederci"), new Instant(0), secondWindow)); + valuesInWindow(materializeValuesFor(View.asIterable(), "Arrivederci"), + new Instant(0), secondWindow)); - Assert.assertThat(sideInputHandler.get(view1, secondWindow), contains("Arrivederci")); + assertThat(sideInputHandler.get(view1, secondWindow), contains("Arrivederci")); // contents for first window should be unaffected - Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello")); + assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello")); } @Test @@ -194,9 +199,10 @@ public class SideInputHandlerTest { // add value for view1 in the first window sideInputHandler.addSideInputValue( view1, - valuesInWindow(ImmutableList.of("Hello"), new Instant(0), firstWindow)); + valuesInWindow(materializeValuesFor(View.asIterable(), "Hello"), + new Instant(0), firstWindow)); - Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello")); + assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello")); // view2 should not have any data assertFalse(sideInputHandler.isReady(view2, firstWindow)); @@ -204,18 +210,19 @@ public class SideInputHandlerTest { // also add some data for view2 sideInputHandler.addSideInputValue( view2, - valuesInWindow(ImmutableList.of("Salut"), new Instant(0), firstWindow)); + valuesInWindow(materializeValuesFor(View.asIterable(), "Salut"), + new Instant(0), firstWindow)); assertTrue(sideInputHandler.isReady(view2, firstWindow)); - Assert.assertThat(sideInputHandler.get(view2, firstWindow), contains("Salut")); + assertThat(sideInputHandler.get(view2, firstWindow), contains("Salut")); // view1 should not be affected by that - Assert.assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello")); + assertThat(sideInputHandler.get(view1, firstWindow), contains("Hello")); } @SuppressWarnings({"unchecked", "rawtypes"}) private WindowedValue<Iterable<?>> valuesInWindow( - Iterable<?> values, Instant timestamp, BoundedWindow window) { + List<Object> values, Instant timestamp, BoundedWindow window) { return (WindowedValue) WindowedValue.of(values, timestamp, window, PaneInfo.NO_FIRING); } } http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java index 43da92f..ea8f168 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SideInputContainer.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.direct; import static com.google.common.base.Preconditions.checkArgument; +import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.base.Optional; import com.google.common.cache.CacheBuilder; @@ -35,11 +36,18 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; +import org.apache.beam.runners.core.InMemoryMultimapSideInputView; import org.apache.beam.runners.core.ReadyCheckingSideInputReader; import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.Materializations; +import org.apache.beam.sdk.transforms.Materializations.MultimapView; +import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.WindowingStrategy; @@ -60,6 +68,16 @@ class SideInputContainer { */ public static SideInputContainer create( final EvaluationContext context, Collection<PCollectionView<?>> containedViews) { + for (PCollectionView<?> pCollectionView : containedViews) { + checkArgument( + Materializations.MULTIMAP_MATERIALIZATION_URN.equals( + pCollectionView.getViewFn().getMaterialization().getUrn()), + "This handler is only capable of dealing with %s materializations " + + "but was asked to handle %s for PCollectionView with tag %s.", + Materializations.MULTIMAP_MATERIALIZATION_URN, + pCollectionView.getViewFn().getMaterialization().getUrn(), + pCollectionView.getTagInternal().getId()); + } LoadingCache<PCollectionViewWindow<?>, AtomicReference<Iterable<? extends WindowedValue<?>>>> viewByWindows = CacheBuilder.newBuilder().build(new CallbackSchedulingLoader(context)); return new SideInputContainer(containedViews, viewByWindows); @@ -239,11 +257,21 @@ class SideInputContainer { "calling get() on PCollectionView %s that is not ready in window %s", view, window); - // Safe covariant cast - @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> values = - (Iterable<WindowedValue<?>>) viewContents.getUnchecked(PCollectionViewWindow.of(view, - window)).get(); - return view.getViewFn().apply(values); + // Safe covariant cast since we know that the view only contains KVs. + @SuppressWarnings("unchecked") Iterable<KV<?, ?>> elements = Iterables.transform( + (Iterable<WindowedValue<KV<?, ?>>>) viewContents.getUnchecked( + PCollectionViewWindow.of(view, window)).get(), + new Function<WindowedValue<KV<?, ?>>, KV<?, ?>>() { + @Override + public KV<?, ?> apply(WindowedValue<KV<?, ?>> windowedValue) { + return windowedValue.getValue(); + } + }); + + ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn(); + Coder<?> keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder(); + return viewFn.apply( + InMemoryMultimapSideInputView.fromIterable(keyCoder, (Iterable) elements)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java index cc9ce60..0a1ffe7 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/EvaluationContextTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.direct; +import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; @@ -28,7 +29,6 @@ import static org.junit.Assert.assertThat; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import java.util.Collection; -import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.core.SideInputReader; @@ -41,8 +41,10 @@ import org.apache.beam.runners.direct.WatermarkManager.FiredTimers; import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.coders.VarIntCoder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.runners.AppliedPTransform; @@ -126,33 +128,47 @@ public class EvaluationContextTest { @Test public void writeToViewWriterThenReadReads() { - PCollectionViewWriter<Integer, Iterable<Integer>> viewWriter = + PCollectionViewWriter<?, Iterable<Integer>> viewWriter = context.createPCollectionViewWriter( PCollection.createPrimitiveOutputInternal( p, WindowingStrategy.globalDefault(), IsBounded.BOUNDED, - IterableCoder.of(VarIntCoder.of())), + IterableCoder.of(KvCoder.of(VoidCoder.of(), VarIntCoder.of()))), view); BoundedWindow window = new TestBoundedWindow(new Instant(1024L)); BoundedWindow second = new TestBoundedWindow(new Instant(899999L)); - WindowedValue<Integer> firstValue = - WindowedValue.of(1, new Instant(1222), window, PaneInfo.ON_TIME_AND_ONLY_FIRING); - WindowedValue<Integer> secondValue = - WindowedValue.of( - 2, new Instant(8766L), second, PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0)); - Iterable<WindowedValue<Integer>> values = ImmutableList.of(firstValue, secondValue); - viewWriter.add(values); + ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder(); + for (Object materializedValue : materializeValuesFor(View.asIterable(), 1)) { + valuesBuilder.add(WindowedValue.of( + materializedValue, + new Instant(1222), + window, + PaneInfo.ON_TIME_AND_ONLY_FIRING)); + } + for (Object materializedValue : materializeValuesFor(View.asIterable(), 2)) { + valuesBuilder.add(WindowedValue.of( + materializedValue, + new Instant(8766L), + second, + PaneInfo.createPane(true, false, Timing.ON_TIME, 0, 0))); + } + viewWriter.add((Iterable) valuesBuilder.build()); SideInputReader reader = context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view)); assertThat(reader.get(view, window), containsInAnyOrder(1)); assertThat(reader.get(view, second), containsInAnyOrder(2)); - WindowedValue<Integer> overrittenSecondValue = - WindowedValue.of( - 4444, new Instant(8677L), second, PaneInfo.createPane(false, true, Timing.LATE, 1, 1)); - viewWriter.add(Collections.singleton(overrittenSecondValue)); + ImmutableList.Builder<WindowedValue<?>> overwrittenValuesBuilder = ImmutableList.builder(); + for (Object materializedValue : materializeValuesFor(View.asIterable(), 4444)) { + overwrittenValuesBuilder.add(WindowedValue.of( + materializedValue, + new Instant(8677L), + second, + PaneInfo.createPane(false, true, Timing.LATE, 1, 1))); + } + viewWriter.add((Iterable) overwrittenValuesBuilder.build()); assertThat(reader.get(view, second), containsInAnyOrder(2)); // The cached value is served in the earlier reader reader = context.createSideInputReader(ImmutableList.<PCollectionView<?>>of(view)); http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java index 5e7c799..91255e0 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/SideInputContainerTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.direct; +import static org.apache.beam.sdk.testing.PCollectionViewTesting.materializeValuesFor; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasEntry; @@ -34,8 +35,6 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.beam.runners.core.ReadyCheckingSideInputReader; import org.apache.beam.runners.core.SideInputReader; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Mean; @@ -49,9 +48,7 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PCollectionViews; import org.apache.beam.sdk.values.TypeDescriptor; -import org.apache.beam.sdk.values.WindowingStrategy; import org.joda.time.Instant; import org.junit.Before; import org.junit.Rule; @@ -134,13 +131,22 @@ public class SideInputContainerTest { @Test public void getAfterWriteReturnsPaneInWindow() throws Exception { - WindowedValue<KV<String, Integer>> one = - WindowedValue.of( - KV.of("one", 1), new Instant(1L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING); - WindowedValue<KV<String, Integer>> two = - WindowedValue.of( - KV.of("two", 2), new Instant(20L), FIRST_WINDOW, PaneInfo.ON_TIME_AND_ONLY_FIRING); - container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two)); + ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder(); + for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) { + valuesBuilder.add(WindowedValue.of( + materializedValue, + new Instant(1L), + FIRST_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING)); + } + for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("two", 2))) { + valuesBuilder.add(WindowedValue.of( + materializedValue, + new Instant(20L), + FIRST_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING)); + } + container.write(mapView, valuesBuilder.build()); Map<String, Integer> viewContents = container @@ -153,19 +159,22 @@ public class SideInputContainerTest { @Test public void getReturnsLatestPaneInWindow() throws Exception { - WindowedValue<KV<String, Integer>> one = - WindowedValue.of( - KV.of("one", 1), - new Instant(1L), - SECOND_WINDOW, - PaneInfo.createPane(true, false, Timing.EARLY)); - WindowedValue<KV<String, Integer>> two = - WindowedValue.of( - KV.of("two", 2), - new Instant(20L), - SECOND_WINDOW, - PaneInfo.createPane(true, false, Timing.EARLY)); - container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two)); + ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder(); + for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) { + valuesBuilder.add(WindowedValue.of( + materializedValue, + new Instant(1L), + SECOND_WINDOW, + PaneInfo.createPane(true, false, Timing.EARLY))); + } + for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("two", 2))) { + valuesBuilder.add(WindowedValue.of( + materializedValue, + new Instant(20L), + SECOND_WINDOW, + PaneInfo.createPane(true, false, Timing.EARLY))); + } + container.write(mapView, valuesBuilder.build()); Map<String, Integer> viewContents = container @@ -175,13 +184,15 @@ public class SideInputContainerTest { assertThat(viewContents, hasEntry("two", 2)); assertThat(viewContents.size(), is(2)); - WindowedValue<KV<String, Integer>> three = - WindowedValue.of( - KV.of("three", 3), - new Instant(300L), - SECOND_WINDOW, - PaneInfo.createPane(false, false, Timing.EARLY, 1, -1)); - container.write(mapView, ImmutableList.<WindowedValue<?>>of(three)); + ImmutableList.Builder<WindowedValue<?>> overwriteValuesBuilder = ImmutableList.builder(); + for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("three", 3))) { + overwriteValuesBuilder.add(WindowedValue.of( + materializedValue, + new Instant(300L), + SECOND_WINDOW, + PaneInfo.createPane(false, false, Timing.EARLY, 1, -1))); + } + container.write(mapView, overwriteValuesBuilder.build()); Map<String, Integer> overwrittenViewContents = container @@ -209,10 +220,7 @@ public class SideInputContainerTest { PCollection<KV<String, String>> input = pipeline.apply(Create.empty(new TypeDescriptor<KV<String, String>>() {})); PCollectionView<Map<String, Iterable<String>>> newView = - PCollectionViews.multimapView( - input, - WindowingStrategy.globalDefault(), - KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + input.apply(View.<String, String>asMultimap()); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("unknown views"); @@ -232,19 +240,22 @@ public class SideInputContainerTest { @Test public void writeForMultipleElementsInDifferentWindowsSucceeds() throws Exception { - WindowedValue<Double> firstWindowedValue = - WindowedValue.of( - 2.875, - FIRST_WINDOW.maxTimestamp().minus(200L), - FIRST_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING); - WindowedValue<Double> secondWindowedValue = - WindowedValue.of( - 4.125, - SECOND_WINDOW.maxTimestamp().minus(2_000_000L), - SECOND_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING); - container.write(singletonView, ImmutableList.of(firstWindowedValue, secondWindowedValue)); + ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder(); + for (Object materializedValue : materializeValuesFor(View.asSingleton(), 2.875)) { + valuesBuilder.add(WindowedValue.of( + materializedValue, + FIRST_WINDOW.maxTimestamp().minus(200L), + FIRST_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING)); + } + for (Object materializedValue : materializeValuesFor(View.asSingleton(), 4.125)) { + valuesBuilder.add(WindowedValue.of( + materializedValue, + SECOND_WINDOW.maxTimestamp().minus(2_000_000L), + SECOND_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING)); + } + container.write(singletonView, valuesBuilder.build()); assertThat( container .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)) @@ -259,20 +270,15 @@ public class SideInputContainerTest { @Test public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Exception { - WindowedValue<Integer> firstValue = - WindowedValue.of( - 44, - FIRST_WINDOW.maxTimestamp().minus(200L), - FIRST_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING); - WindowedValue<Integer> secondValue = - WindowedValue.of( - 44, - FIRST_WINDOW.maxTimestamp().minus(200L), - FIRST_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING); - - container.write(iterableView, ImmutableList.of(firstValue, secondValue)); + ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder(); + for (Object materializedValue : materializeValuesFor(View.asIterable(), 44, 44)) { + valuesBuilder.add(WindowedValue.of( + materializedValue, + FIRST_WINDOW.maxTimestamp().minus(200L), + FIRST_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING)); + } + container.write(iterableView, valuesBuilder.build()); assertThat( container @@ -283,13 +289,15 @@ public class SideInputContainerTest { @Test public void writeForElementInMultipleWindowsSucceeds() throws Exception { - WindowedValue<Double> multiWindowedValue = - WindowedValue.of( - 2.875, - FIRST_WINDOW.maxTimestamp().minus(200L), - ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW), - PaneInfo.ON_TIME_AND_ONLY_FIRING); - container.write(singletonView, ImmutableList.of(multiWindowedValue)); + ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder(); + for (Object materializedValue : materializeValuesFor(View.asSingleton(), 2.875)) { + valuesBuilder.add(WindowedValue.of( + materializedValue, + FIRST_WINDOW.maxTimestamp().minus(200L), + ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW), + PaneInfo.ON_TIME_AND_ONLY_FIRING)); + } + container.write(singletonView, valuesBuilder.build()); assertThat( container .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView)) @@ -304,19 +312,22 @@ public class SideInputContainerTest { @Test public void finishDoesNotOverwriteWrittenElements() throws Exception { - WindowedValue<KV<String, Integer>> one = - WindowedValue.of( - KV.of("one", 1), - new Instant(1L), - SECOND_WINDOW, - PaneInfo.createPane(true, false, Timing.EARLY)); - WindowedValue<KV<String, Integer>> two = - WindowedValue.of( - KV.of("two", 2), - new Instant(20L), - SECOND_WINDOW, - PaneInfo.createPane(true, false, Timing.EARLY)); - container.write(mapView, ImmutableList.<WindowedValue<?>>of(one, two)); + ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder(); + for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) { + valuesBuilder.add(WindowedValue.of( + materializedValue, + new Instant(1L), + SECOND_WINDOW, + PaneInfo.createPane(true, false, Timing.EARLY))); + } + for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("two", 2))) { + valuesBuilder.add(WindowedValue.of( + materializedValue, + new Instant(20L), + SECOND_WINDOW, + PaneInfo.createPane(true, false, Timing.EARLY))); + } + container.write(mapView, valuesBuilder.build()); immediatelyInvokeCallback(mapView, SECOND_WINDOW); @@ -362,14 +373,15 @@ public class SideInputContainerTest { */ @Test public void isReadyForSomeNotReadyViewsFalseUntilElements() { - container.write( - mapView, - ImmutableList.of( - WindowedValue.of( - KV.of("one", 1), - SECOND_WINDOW.maxTimestamp().minus(100L), - SECOND_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING))); + ImmutableList.Builder<WindowedValue<?>> mapValuesBuilder = ImmutableList.builder(); + for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("one", 1))) { + mapValuesBuilder.add(WindowedValue.of( + materializedValue, + SECOND_WINDOW.maxTimestamp().minus(100L), + SECOND_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING)); + } + container.write(mapView, mapValuesBuilder.build()); ReadyCheckingSideInputReader reader = container.createReaderForViews(ImmutableList.of(mapView, singletonView)); @@ -378,25 +390,27 @@ public class SideInputContainerTest { assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false)); - container.write( - mapView, - ImmutableList.of( - WindowedValue.of( - KV.of("too", 2), - FIRST_WINDOW.maxTimestamp().minus(100L), - FIRST_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING))); + ImmutableList.Builder<WindowedValue<?>> newMapValuesBuilder = ImmutableList.builder(); + for (Object materializedValue : materializeValuesFor(View.asMap(), KV.of("too", 2))) { + newMapValuesBuilder.add(WindowedValue.of( + materializedValue, + FIRST_WINDOW.maxTimestamp().minus(100L), + FIRST_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING)); + } + container.write(mapView, newMapValuesBuilder.build()); // Cached value is false assertThat(reader.isReady(mapView, FIRST_WINDOW), is(false)); - container.write( - singletonView, - ImmutableList.of( - WindowedValue.of( - 1.25, - SECOND_WINDOW.maxTimestamp().minus(100L), - SECOND_WINDOW, - PaneInfo.ON_TIME_AND_ONLY_FIRING))); + ImmutableList.Builder<WindowedValue<?>> singletonValuesBuilder = ImmutableList.builder(); + for (Object materializedValue : materializeValuesFor(View.asSingleton(), 1.25)) { + singletonValuesBuilder.add(WindowedValue.of( + materializedValue, + SECOND_WINDOW.maxTimestamp().minus(100L), + SECOND_WINDOW, + PaneInfo.ON_TIME_AND_ONLY_FIRING)); + } + container.write(singletonView, singletonValuesBuilder.build()); assertThat(reader.isReady(mapView, SECOND_WINDOW), is(true)); assertThat(reader.isReady(singletonView, SECOND_WINDOW), is(false)); http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java index 5bc48b7..3716ec8 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewEvaluatorFactoryTest.java @@ -32,11 +32,11 @@ import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.Values; -import org.apache.beam.sdk.transforms.View.CreatePCollectionView; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.WithKeys; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionViews; +import org.apache.beam.sdk.values.PCollectionView; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; @@ -56,10 +56,7 @@ public class ViewEvaluatorFactoryTest { @Test public void testInMemoryEvaluator() throws Exception { PCollection<String> input = p.apply(Create.of("foo", "bar")); - CreatePCollectionView<String, Iterable<String>> createView = - CreatePCollectionView.of( - PCollectionViews.iterableView( - input, input.getWindowingStrategy(), StringUtf8Coder.of())); + PCollectionView<Iterable<String>> pCollectionView = input.apply(View.<String>asIterable()); PCollection<Iterable<String>> concat = input.apply(WithKeys.<Void, String>of((Void) null)) .setCoder(KvCoder.of(VoidCoder.of(), StringUtf8Coder.of())) @@ -67,11 +64,11 @@ public class ViewEvaluatorFactoryTest { .apply(Values.<Iterable<String>>create()); PCollection<Iterable<String>> view = concat.apply( - new ViewOverrideFactory.WriteView<String, Iterable<String>>(createView.getView())); + new ViewOverrideFactory.WriteView<String, Iterable<String>>(pCollectionView)); EvaluationContext context = mock(EvaluationContext.class); TestViewWriter<String, Iterable<String>> viewWriter = new TestViewWriter<>(); - when(context.createPCollectionViewWriter(concat, createView.getView())).thenReturn(viewWriter); + when(context.createPCollectionViewWriter(concat, pCollectionView)).thenReturn(viewWriter); CommittedBundle<String> inputBundle = bundleFactory.createBundle(input).commit(Instant.now()); AppliedPTransform<?, ?, ?> producer = DirectGraphs.getProducer(view); http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java index 94d8d70..556cac5 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ViewOverrideFactoryTest.java @@ -34,13 +34,13 @@ import org.apache.beam.sdk.runners.TransformHierarchy.Node; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.View.CreatePCollectionView; import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PCollectionViews; -import org.apache.beam.sdk.values.WindowingStrategy; +import org.apache.beam.sdk.values.TupleTag; import org.hamcrest.Matchers; import org.junit.Rule; import org.junit.Test; @@ -59,8 +59,7 @@ public class ViewOverrideFactoryTest implements Serializable { @Test public void replacementGetViewReturnsOriginal() { final PCollection<Integer> ints = p.apply("CreateContents", Create.of(1, 2, 3)); - final PCollectionView<List<Integer>> view = - PCollectionViews.listView(ints, WindowingStrategy.globalDefault(), ints.getCoder()); + final PCollectionView<List<Integer>> view = ints.apply(View.<Integer>asList()); PTransformReplacement<PCollection<Integer>, PCollection<Integer>> replacement = factory.getReplacementTransform( AppliedPTransform @@ -89,7 +88,7 @@ public class ViewOverrideFactoryTest implements Serializable { // so not asserted one way or the other assertThat( replacementView.getTagInternal(), - equalTo(view.getTagInternal())); + equalTo((TupleTag) view.getTagInternal())); assertThat( replacementView.getViewFn(), Matchers.<ViewFn<?, ?>>equalTo(view.getViewFn())); http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java index 79a23cc..cffcc5a 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java @@ -38,7 +38,6 @@ import java.util.List; import java.util.UUID; import javax.annotation.Nullable; import org.apache.beam.runners.direct.WriteWithShardingFactory.CalculateShardsFn; -import org.apache.beam.sdk.coders.VarLongCoder; import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.DynamicFileDestinations; import org.apache.beam.sdk.io.FileBasedSink; @@ -56,15 +55,14 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFnTester; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PCollectionViews; import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -217,9 +215,8 @@ public class WriteWithShardingFactoryTest implements Serializable { public void keyBasedOnCountFnFewElementsExtraShards() throws Exception { long countValue = (long) WriteWithShardingFactory.MIN_SHARDS_FOR_LOG + 3; PCollection<Long> inputCount = p.apply(Create.of(countValue)); - PCollectionView<Long> elementCountView = - PCollectionViews.singletonView( - inputCount, WindowingStrategy.globalDefault(), true, 0L, VarLongCoder.of()); + PCollectionView<Long> elementCountView = inputCount.apply( + View.<Long>asSingleton().withDefaultValue(countValue)); CalculateShardsFn fn = new CalculateShardsFn(3); DoFnTester<Long, Integer> fnTester = DoFnTester.of(fn); http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index cec01f8..aa5cc39 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -307,7 +307,6 @@ class FlinkStreamingTransformTranslators { intToViewMapping.put(count, sideInput); tagToIntMapping.put(tag, count); count++; - Coder<Iterable<WindowedValue<?>>> coder = sideInput.getCoderInternal(); } http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java index f275290..fb3f375 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSideInputReader.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.flink.translation.functions; +import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import java.util.Collections; @@ -24,8 +25,10 @@ import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; import org.apache.beam.runners.core.SideInputReader; +import org.apache.beam.sdk.transforms.Materializations; +import org.apache.beam.sdk.transforms.Materializations.MultimapView; +import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; @@ -35,6 +38,13 @@ import org.apache.flink.api.common.functions.RuntimeContext; * A {@link SideInputReader} for the Flink Batch Runner. */ public class FlinkSideInputReader implements SideInputReader { + /** A {@link MultimapView} which always returns an empty iterable. */ + private static final MultimapView EMPTY_MULTMAP_VIEW = new MultimapView() { + @Override + public Iterable get(Object o) { + return Collections.EMPTY_LIST; + } + }; private final Map<TupleTag<?>, WindowingStrategy<?, ?>> sideInputs; @@ -42,6 +52,16 @@ public class FlinkSideInputReader implements SideInputReader { public FlinkSideInputReader(Map<PCollectionView<?>, WindowingStrategy<?, ?>> indexByView, RuntimeContext runtimeContext) { + for (PCollectionView<?> view : indexByView.keySet()) { + checkArgument( + Materializations.MULTIMAP_MATERIALIZATION_URN.equals( + view.getViewFn().getMaterialization().getUrn()), + "This handler is only capable of dealing with %s materializations " + + "but was asked to handle %s for PCollectionView with tag %s.", + Materializations.MULTIMAP_MATERIALIZATION_URN, + view.getViewFn().getMaterialization().getUrn(), + view.getTagInternal().getId()); + } sideInputs = new HashMap<>(); for (Map.Entry<PCollectionView<?>, WindowingStrategy<?, ?>> entry : indexByView.entrySet()) { sideInputs.put(entry.getKey().getTagInternal(), entry.getValue()); @@ -53,7 +73,7 @@ public class FlinkSideInputReader implements SideInputReader { @Override public <T> T get(PCollectionView<T> view, BoundedWindow window) { checkNotNull(view, "View passed to sideInput cannot be null"); - TupleTag<Iterable<WindowedValue<?>>> tag = view.getTagInternal(); + TupleTag<?> tag = view.getTagInternal(); checkNotNull( sideInputs.get(tag), "Side input for " + view + " not available."); @@ -63,7 +83,8 @@ public class FlinkSideInputReader implements SideInputReader { tag.getId(), new SideInputInitializer<>(view)); T result = sideInputs.get(window); if (result == null) { - result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList()); + ViewFn<MultimapView, T> viewFn = (ViewFn<MultimapView, T>) view.getViewFn(); + result = viewFn.apply(EMPTY_MULTMAP_VIEW); } return result; } http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java index 12222b4..782f72a 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SideInputInitializer.java @@ -17,12 +17,23 @@ */ package org.apache.beam.runners.flink.translation.functions; +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.base.Function; +import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.beam.runners.core.InMemoryMultimapSideInputView; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.transforms.Materializations; +import org.apache.beam.sdk.transforms.Materializations.MultimapView; +import org.apache.beam.sdk.transforms.ViewFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.flink.api.common.functions.BroadcastVariableInitializer; @@ -30,24 +41,33 @@ import org.apache.flink.api.common.functions.BroadcastVariableInitializer; * {@link BroadcastVariableInitializer} that initializes the broadcast input as a {@code Map} * from window to side input. */ -public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow> - implements BroadcastVariableInitializer<WindowedValue<ElemT>, Map<BoundedWindow, ViewT>> { +public class SideInputInitializer<ViewT> + implements BroadcastVariableInitializer<WindowedValue<?>, Map<BoundedWindow, ViewT>> { PCollectionView<ViewT> view; public SideInputInitializer(PCollectionView<ViewT> view) { + checkArgument( + Materializations.MULTIMAP_MATERIALIZATION_URN.equals( + view.getViewFn().getMaterialization().getUrn()), + "This handler is only capable of dealing with %s materializations " + + "but was asked to handle %s for PCollectionView with tag %s.", + Materializations.MULTIMAP_MATERIALIZATION_URN, + view.getViewFn().getMaterialization().getUrn(), + view.getTagInternal().getId()); this.view = view; } @Override public Map<BoundedWindow, ViewT> initializeBroadcastVariable( - Iterable<WindowedValue<ElemT>> inputValues) { + Iterable<WindowedValue<?>> inputValues) { // first partition into windows - Map<BoundedWindow, List<WindowedValue<ElemT>>> partitionedElements = new HashMap<>(); - for (WindowedValue<ElemT> value: inputValues) { + Map<BoundedWindow, List<WindowedValue<KV<?, ?>>>> partitionedElements = new HashMap<>(); + for (WindowedValue<KV<?, ?>> value + : (Iterable<WindowedValue<KV<?, ?>>>) (Iterable) inputValues) { for (BoundedWindow window: value.getWindows()) { - List<WindowedValue<ElemT>> windowedValues = partitionedElements.get(window); + List<WindowedValue<KV<?, ?>>> windowedValues = partitionedElements.get(window); if (windowedValues == null) { windowedValues = new ArrayList<>(); partitionedElements.put(window, windowedValues); @@ -58,14 +78,20 @@ public class SideInputInitializer<ElemT, ViewT, W extends BoundedWindow> Map<BoundedWindow, ViewT> resultMap = new HashMap<>(); - for (Map.Entry<BoundedWindow, List<WindowedValue<ElemT>>> elements: + for (Map.Entry<BoundedWindow, List<WindowedValue<KV<?, ?>>>> elements: partitionedElements.entrySet()) { - @SuppressWarnings("unchecked") - Iterable<WindowedValue<?>> elementsIterable = - (List<WindowedValue<?>>) (List<?>) elements.getValue(); - - resultMap.put(elements.getKey(), view.getViewFn().apply(elementsIterable)); + ViewFn<MultimapView, ViewT> viewFn = (ViewFn<MultimapView, ViewT>) view.getViewFn(); + Coder keyCoder = ((KvCoder<?, ?>) view.getCoderInternal()).getKeyCoder(); + resultMap.put(elements.getKey(), viewFn.apply(InMemoryMultimapSideInputView.fromIterable( + keyCoder, + (Iterable) Iterables.transform(elements.getValue(), + new Function<WindowedValue<KV<?, ?>>, KV<?, ?>>() { + @Override + public KV<?, ?> apply(WindowedValue<KV<?, ?>> windowedValue) { + return windowedValue.getValue(); + } + })))); } return resultMap; http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index ad17de8..33ac024f 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -35,6 +35,7 @@ import org.apache.beam.runners.flink.FlinkPipelineOptions; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; @@ -47,16 +48,19 @@ import org.apache.beam.sdk.state.Timer; import org.apache.beam.sdk.state.TimerSpec; import org.apache.beam.sdk.state.TimerSpecs; import org.apache.beam.sdk.state.ValueState; -import org.apache.beam.sdk.testing.PCollectionViewTesting; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.View; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.WindowingStrategy; @@ -71,6 +75,7 @@ import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness; import org.apache.flink.util.OutputTag; import org.joda.time.Duration; import org.joda.time.Instant; +import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -84,26 +89,19 @@ public class DoFnOperatorTest { // views and windows for testing side inputs private static final long WINDOW_MSECS_1 = 100; private static final long WINDOW_MSECS_2 = 500; - - private WindowingStrategy<Object, IntervalWindow> windowingStrategy1 = - WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_1))); - - private PCollectionView<Iterable<String>> view1 = - PCollectionViewTesting.testingView( - new TupleTag<Iterable<WindowedValue<String>>>() {}, - new PCollectionViewTesting.IdentityViewFn<String>(), - StringUtf8Coder.of(), - windowingStrategy1); - - private WindowingStrategy<Object, IntervalWindow> windowingStrategy2 = - WindowingStrategy.of(FixedWindows.of(new Duration(WINDOW_MSECS_2))); - - private PCollectionView<Iterable<String>> view2 = - PCollectionViewTesting.testingView( - new TupleTag<Iterable<WindowedValue<String>>>() {}, - new PCollectionViewTesting.IdentityViewFn<String>(), - StringUtf8Coder.of(), - windowingStrategy2); + private PCollectionView<Iterable<String>> view1; + private PCollectionView<Iterable<String>> view2; + + @Before + public void setUp() { + PCollection<String> pc = Pipeline.create().apply(Create.of("1")); + view1 = pc + .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_1)))) + .apply(View.<String>asIterable()); + view2 = pc + .apply(Window.<String>into(FixedWindows.of(new Duration(WINDOW_MSECS_2)))) + .apply(View.<String>asIterable()); + } @Test @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index 81e7a97..cc43e27 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -991,15 +991,15 @@ public class DataflowPipelineTranslatorTest implements Serializable { assertAllStepOutputsHaveUniqueIds(job); List<Step> steps = job.getSteps(); - assertEquals(9, steps.size()); + assertEquals(10, steps.size()); @SuppressWarnings("unchecked") List<Map<String, Object>> toIsmRecordOutputs = - (List<Map<String, Object>>) steps.get(7).getProperties().get(PropertyNames.OUTPUT_INFO); + (List<Map<String, Object>>) steps.get(8).getProperties().get(PropertyNames.OUTPUT_INFO); assertTrue( Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format")); - Step collectionToSingletonStep = steps.get(8); + Step collectionToSingletonStep = steps.get(9); assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); } @@ -1023,16 +1023,16 @@ public class DataflowPipelineTranslatorTest implements Serializable { assertAllStepOutputsHaveUniqueIds(job); List<Step> steps = job.getSteps(); - assertEquals(3, steps.size()); + assertEquals(4, steps.size()); @SuppressWarnings("unchecked") List<Map<String, Object>> toIsmRecordOutputs = - (List<Map<String, Object>>) steps.get(1).getProperties().get(PropertyNames.OUTPUT_INFO); + (List<Map<String, Object>>) steps.get(2).getProperties().get(PropertyNames.OUTPUT_INFO); assertTrue( Structs.getBoolean(Iterables.getOnlyElement(toIsmRecordOutputs), "use_indexed_format")); - Step collectionToSingletonStep = steps.get(2); + Step collectionToSingletonStep = steps.get(3); assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind()); } http://git-wip-us.apache.org/repos/asf/beam/blob/5e2593da/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java index 7cb8628..68e3e3c 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/TransformTranslator.java @@ -40,6 +40,7 @@ import org.apache.beam.runners.spark.metrics.MetricsAccumulator; import org.apache.beam.runners.spark.util.SideInputBroadcast; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.transforms.Combine; @@ -527,7 +528,11 @@ public final class TransformTranslator { Iterable<? extends WindowedValue<?>> iter = context.getWindowedValues(context.getInput(transform)); PCollectionView<WriteT> output = transform.getView(); - Coder<Iterable<WindowedValue<?>>> coderInternal = output.getCoderInternal(); + Coder<Iterable<WindowedValue<?>>> coderInternal = + (Coder) IterableCoder.of( + WindowedValue.getFullCoder( + output.getCoderInternal(), + output.getWindowingStrategyInternal().getWindowFn().windowCoder())); @SuppressWarnings("unchecked") Iterable<WindowedValue<?>> iterCast = (Iterable<WindowedValue<?>>) iter;