Closes #1186
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e92157b3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e92157b3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e92157b3 Branch: refs/heads/master Commit: e92157b37fefa0931a63191e24b06fd8df2f7a32 Parents: 8827ccf 1db4ff6 Author: Thomas Weise <t...@apache.org> Authored: Wed Oct 26 13:21:18 2016 -0700 Committer: Thomas Weise <t...@apache.org> Committed: Wed Oct 26 13:21:18 2016 -0700 ---------------------------------------------------------------------- .../apex/translators/GroupByKeyTranslator.java | 3 +- .../translators/ParDoBoundMultiTranslator.java | 4 +- .../apex/translators/ParDoBoundTranslator.java | 4 +- .../apex/translators/TranslationContext.java | 10 + .../functions/ApexGroupByKeyOperator.java | 12 +- .../functions/ApexParDoOperator.java | 11 +- .../translators/utils/ApexStateInternals.java | 438 +++++++++++++++++++ .../translators/ParDoBoundTranslatorTest.java | 62 ++- .../utils/ApexStateInternalsTest.java | 361 +++++++++++++++ 9 files changed, 883 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e92157b3/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/ParDoBoundMultiTranslator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e92157b3/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java ---------------------------------------------------------------------- diff --cc runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java index 72b4299,9ea4233..6f50398 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java @@@ -198,99 -200,51 +208,133 @@@ public class ParDoBoundTranslatorTest .apply(Sum.integersGlobally().asSingletonView()); ApexParDoOperator<Integer, Integer> operator = new ApexParDoOperator<>(options, - new Add(0), new TupleTag<Integer>(), TupleTagList.empty().getAll(), + new Add(singletonView), new TupleTag<Integer>(), TupleTagList.empty().getAll(), WindowingStrategy.globalDefault(), Collections.<PCollectionView<?>>singletonList(singletonView), - coder); + coder, + new ApexStateInternals.ApexStateInternalsFactory<Void>() + ); operator.setup(null); operator.beginWindow(0); - WindowedValue<Integer> wv = WindowedValue.valueInGlobalWindow(0); - operator.input.process(ApexStreamTuple.DataTuple.of(wv)); - operator.input.process(ApexStreamTuple.WatermarkTuple.<WindowedValue<Integer>>of(0)); - operator.endWindow(); - Assert.assertNotNull("Serialization", KryoCloneUtils.cloneObject(operator)); + WindowedValue<Integer> wv1 = WindowedValue.valueInGlobalWindow(1); + WindowedValue<Iterable<?>> sideInput = WindowedValue.<Iterable<?>>valueInGlobalWindow( + Lists.<Integer>newArrayList(22)); + operator.input.process(ApexStreamTuple.DataTuple.of(wv1)); // pushed back input + final List<Object> results = Lists.newArrayList(); + Sink<Object> sink = new Sink<Object>() { + @Override + public void put(Object tuple) { + results.add(tuple); + } + @Override + public int getCount(boolean reset) { + return 0; + } + }; + + // verify pushed back input checkpointing + Assert.assertNotNull("Serialization", operator = KryoCloneUtils.cloneObject(operator)); + operator.output.setSink(sink); + operator.setup(null); + operator.beginWindow(1); + WindowedValue<Integer> wv2 = WindowedValue.valueInGlobalWindow(2); + operator.sideInput1.process(ApexStreamTuple.DataTuple.of(sideInput)); + Assert.assertEquals("number outputs", 1, results.size()); + Assert.assertEquals("result", WindowedValue.valueInGlobalWindow(23), + ((ApexStreamTuple.DataTuple) results.get(0)).getValue()); + + // verify side input checkpointing + results.clear(); + Assert.assertNotNull("Serialization", operator = KryoCloneUtils.cloneObject(operator)); + operator.output.setSink(sink); + operator.setup(null); + operator.beginWindow(2); + operator.input.process(ApexStreamTuple.DataTuple.of(wv2)); + Assert.assertEquals("number outputs", 1, results.size()); + Assert.assertEquals("result", WindowedValue.valueInGlobalWindow(24), + ((ApexStreamTuple.DataTuple) results.get(0)).getValue()); } + + @Test + public void testMultiOutputParDoWithSideInputs() throws Exception { + ApexPipelineOptions options = PipelineOptionsFactory.create().as(ApexPipelineOptions.class); + options.setRunner(ApexRunner.class); // non-blocking run + Pipeline pipeline = Pipeline.create(options); + + List<Integer> inputs = Arrays.asList(3, -42, 666); + final TupleTag<String> mainOutputTag = new TupleTag<String>("main"); + final TupleTag<Void> sideOutputTag = new TupleTag<Void>("sideOutput"); + + PCollectionView<Integer> sideInput1 = pipeline + .apply("CreateSideInput1", Create.of(11)) + .apply("ViewSideInput1", View.<Integer>asSingleton()); + PCollectionView<Integer> sideInputUnread = pipeline + .apply("CreateSideInputUnread", Create.of(-3333)) + .apply("ViewSideInputUnread", View.<Integer>asSingleton()); + PCollectionView<Integer> sideInput2 = pipeline + .apply("CreateSideInput2", Create.of(222)) + .apply("ViewSideInput2", View.<Integer>asSingleton()); + + PCollectionTuple outputs = pipeline + .apply(Create.of(inputs)) + .apply(ParDo.withSideInputs(sideInput1) + .withSideInputs(sideInputUnread) + .withSideInputs(sideInput2) + .withOutputTags(mainOutputTag, TupleTagList.of(sideOutputTag)) + .of(new TestMultiOutputWithSideInputsFn( + Arrays.asList(sideInput1, sideInput2), + Arrays.<TupleTag<String>>asList()))); + + outputs.get(mainOutputTag).apply(ParDo.of(new EmbeddedCollector())); + ApexRunnerResult result = (ApexRunnerResult) pipeline.run(); + + HashSet<String> expected = Sets.newHashSet("processing: 3: [11, 222]", + "processing: -42: [11, 222]", "processing: 666: [11, 222]"); + long timeout = System.currentTimeMillis() + TIMEOUT_MILLIS; + while (System.currentTimeMillis() < timeout) { + if (EmbeddedCollector.RESULTS.containsAll(expected)) { + break; + } + LOG.info("Waiting for expected results."); + Thread.sleep(SLEEP_MILLIS); + } + result.cancel(); + Assert.assertEquals(Sets.newHashSet(expected), EmbeddedCollector.RESULTS); + } + + private static class TestMultiOutputWithSideInputsFn extends OldDoFn<Integer, String> { + private static final long serialVersionUID = 1L; + + final List<PCollectionView<Integer>> sideInputViews = new ArrayList<>(); + final List<TupleTag<String>> sideOutputTupleTags = new ArrayList<>(); + + public TestMultiOutputWithSideInputsFn(List<PCollectionView<Integer>> sideInputViews, + List<TupleTag<String>> sideOutputTupleTags) { + this.sideInputViews.addAll(sideInputViews); + this.sideOutputTupleTags.addAll(sideOutputTupleTags); + } + + @Override + public void processElement(ProcessContext c) throws Exception { + outputToAllWithSideInputs(c, "processing: " + c.element()); + } + + private void outputToAllWithSideInputs(ProcessContext c, String value) { + if (!sideInputViews.isEmpty()) { + List<Integer> sideInputValues = new ArrayList<>(); + for (PCollectionView<Integer> sideInputView : sideInputViews) { + sideInputValues.add(c.sideInput(sideInputView)); + } + value += ": " + sideInputValues; + } + c.output(value); + for (TupleTag<String> sideOutputTupleTag : sideOutputTupleTags) { + c.sideOutput(sideOutputTupleTag, + sideOutputTupleTag.getId() + ": " + value); + } + } + + } + }