This is an automated email from the ASF dual-hosted git repository. aromanenko pushed a commit to branch spark-runner_structured-streaming in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/spark-runner_structured-streaming by this push: new c0061ab Added "testTwoPardoInRow" c0061ab is described below commit c0061ab8ac8c896af5635a7ecca94fd255ec4aae Author: Alexey Romanenko <aromanenko....@gmail.com> AuthorDate: Tue Feb 12 17:06:10 2019 +0100 Added "testTwoPardoInRow" --- .../translation/batch/ParDoTest.java | 27 ++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java index 88e862f..48350df 100644 --- a/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java +++ b/runners/spark-structured-streaming/src/test/java/org/apache/beam/runners/spark/structuredstreaming/translation/batch/ParDoTest.java @@ -56,4 +56,31 @@ public class ParDoTest implements Serializable { })); pipeline.run(); } + + @Test + public void testTwoPardoInRow() { + PCollection<Integer> input = pipeline.apply(Create.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + input + .apply( + ParDo.of( + new DoFn<Integer, Integer>() { + @ProcessElement + public void processElement(ProcessContext context) { + Integer val = context.element() + 1; + context.output(val); + System.out.println("ParDo1: val = " + val); + } + })) + .apply( + ParDo.of( + new DoFn<Integer, Integer>() { + @ProcessElement + public void processElement(ProcessContext context) { + Integer val = context.element() + 1; + context.output(val); + System.out.println("ParDo2: val = " + val); + } + })); + pipeline.run(); + } }