This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch spark-runner_structured-streaming
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to 
refs/heads/spark-runner_structured-streaming by this push:
     new 1576a43  Add a test for the most simple possible Combine
1576a43 is described below

commit 1576a434fae10b525e8d0c8f70060acbe39522c4
Author: Etienne Chauchot <echauc...@apache.org>
AuthorDate: Tue Feb 12 17:36:55 2019 +0100

    Add a test for the most simple possible Combine
---
 .../translation/batch/CombineTest.java             | 59 ++++++++++++++++++++++
 1 file changed, 59 insertions(+)

diff --git 
a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java
 
b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java
new file mode 100644
index 0000000..944ab09
--- /dev/null
+++ 
b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/CombineTest.java
@@ -0,0 +1,59 @@
+package org.apache.beam.runners.spark.structuredstreaming.translation.batch;
+
+import java.io.Serializable;
+import org.apache.beam.runners.spark.structuredstreaming.SparkPipelineOptions;
+import org.apache.beam.runners.spark.structuredstreaming.SparkRunner;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Test class for beam to spark {@link 
org.apache.beam.sdk.transforms.Combine} translation. */
+@RunWith(JUnit4.class)
+public class CombineTest implements Serializable {
+  private static Pipeline pipeline;
+
+  @BeforeClass
+  public static void beforeClass() {
+    PipelineOptions options = 
PipelineOptionsFactory.create().as(SparkPipelineOptions.class);
+    options.setRunner(SparkRunner.class);
+    pipeline = Pipeline.create(options);
+  }
+
+  @Test
+  public void testCombine() {
+    PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 
8, 9, 10));
+    input.apply(Combine.globally(new Combine.CombineFn<Integer, Integer, 
Integer>() {
+
+      @Override public Integer createAccumulator() {
+        return 0;
+      }
+
+      @Override public Integer addInput(Integer accumulator, Integer input) {
+        return accumulator + input;
+      }
+
+      @Override public Integer mergeAccumulators(Iterable<Integer> 
accumulators) {
+        Integer result = 0;
+        for (Integer value : accumulators){
+          result += value;
+        }
+        return result;
+      }
+
+      @Override public Integer extractOutput(Integer accumulator) {
+        return accumulator;
+      }
+    }));
+    pipeline.run();
+  }
+}

Reply via email to