This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new 1576a43 Add a test for the most simple possible Combine 1576a43 is described below commit 1576a434fae10b525e8d0c8f70060acbe39522c4 Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Tue Feb 12 17:36:55 2019 +0100 Add a test for the most simple possible Combine --- .../translation/batch/CombineTest.java | 59 ++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java new file mode 100644 index 0000000..944ab09 --- /dev/null +++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java @@ -0,0 +1,59 @@ +package org.apache.beam.runners.spark.structuredstreaming.translation.batch; + +import java.io.Serializable; +import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions; +import org.apache.beam.runners.spark.structuredstreaming.SparkRunner; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Count; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Test class for beam to spark {@link org.apache.beam.sdk.transforms.Combine} translation. */ +@RunWith(JUnit4.class) +public class CombineTest implements Serializable { + private static Pipeline pipeline; + + @BeforeClass + public static void beforeClass() { + PipelineOptions options = PipelineOptionsFactory.create().as(SparkPipelineOptions.class); + options.setRunner(SparkRunner.class); + pipeline = Pipeline.create(options); + } + + @Test + public void testCombine() { + PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + input.apply(Combine.globally(new Combine.CombineFn<Integer, Integer, Integer>() { + + @Override public Integer createAccumulator() { + return 0; + } + + @Override public Integer addInput(Integer accumulator, Integer input) { + return accumulator + input; + } + + @Override public Integer mergeAccumulators(Iterable<Integer> accumulators) { + Integer result = 0; + for (Integer value : accumulators){ + result += value; + } + return result; + } + + @Override public Integer extractOutput(Integer accumulator) { + return accumulator; + } + })); + pipeline.run(); + } +}