Rename DataflowAssert to PAssert
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d73ceab8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d73ceab8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d73ceab8 Branch: refs/heads/master Commit: d73ceab8bc0a3f8479882bd689015fd7f869758b Parents: 5f24cef Author: Thomas Groh <tg...@google.com> Authored: Thu Apr 7 15:53:15 2016 -0700 Committer: Davor Bonaci <da...@google.com> Committed: Fri Apr 8 13:55:01 2016 -0700 ---------------------------------------------------------------------- .../contrib/joinlibrary/InnerJoinTest.java | 10 +- .../contrib/joinlibrary/OuterLeftJoinTest.java | 10 +- .../contrib/joinlibrary/OuterRightJoinTest.java | 10 +- examples/java/pom.xml | 2 +- .../dataflow/examples/DebuggingWordCount.java | 12 +- .../cloud/dataflow/examples/WordCountTest.java | 4 +- .../examples/complete/AutoCompleteTest.java | 8 +- .../dataflow/examples/complete/TfIdfTest.java | 4 +- .../complete/TopWikipediaSessionsTest.java | 4 +- .../examples/cookbook/DeDupExampleTest.java | 6 +- .../examples/cookbook/JoinExamplesTest.java | 4 +- .../examples/cookbook/TriggerExampleTest.java | 4 +- .../examples/complete/game/GameStatsTest.java | 4 +- .../complete/game/HourlyTeamScoreTest.java | 4 +- .../examples/complete/game/UserScoreTest.java | 8 +- .../beam/runners/flink/FlinkTestPipeline.java | 6 +- .../streaming/StreamingTransformTranslator.java | 4 +- .../apache/beam/runners/spark/DeDupTest.java | 4 +- .../beam/runners/spark/SimpleWordCountTest.java | 4 +- .../apache/beam/runners/spark/TfIdfTest.java | 4 +- .../spark/translation/DoFnOutputTest.java | 4 +- .../translation/MultiOutputWordCountTest.java | 4 +- .../spark/translation/SerializationTest.java | 4 +- .../translation/WindowedWordCountTest.java | 8 +- .../streaming/FlattenStreamingTest.java | 8 +- .../streaming/KafkaStreamingTest.java | 8 +- .../streaming/SimpleStreamingWordCountTest.java | 8 +- .../utils/DataflowAssertStreaming.java | 42 - .../streaming/utils/PAssertStreaming.java | 42 + .../dataflow/sdk/testing/DataflowAssert.java | 826 ------------------- .../cloud/dataflow/sdk/testing/PAssert.java | 825 ++++++++++++++++++ .../sdk/testing/SerializableMatcher.java | 2 +- .../dataflow/sdk/testing/SourceTestUtils.java | 2 +- .../sdk/testing/TestDataflowPipelineRunner.java | 8 +- .../dataflow/sdk/testing/TestPipeline.java | 6 +- .../google/cloud/dataflow/sdk/PipelineTest.java | 10 +- .../dataflow/sdk/coders/AvroCoderTest.java | 4 +- .../sdk/coders/SerializableCoderTest.java | 4 +- .../cloud/dataflow/sdk/io/AvroIOTest.java | 6 +- .../io/BoundedReadFromUnboundedSourceTest.java | 4 +- .../dataflow/sdk/io/CompressedSourceTest.java | 10 +- .../dataflow/sdk/io/CountingInputTest.java | 12 +- .../dataflow/sdk/io/CountingSourceTest.java | 14 +- .../dataflow/sdk/io/FileBasedSourceTest.java | 6 +- .../cloud/dataflow/sdk/io/TextIOTest.java | 8 +- .../cloud/dataflow/sdk/io/XmlSourceTest.java | 8 +- .../sdk/io/bigtable/BigtableIOTest.java | 8 +- .../sdk/runners/dataflow/CustomSourcesTest.java | 6 +- .../runners/inprocess/InProcessCreateTest.java | 8 +- .../inprocess/InProcessPipelineRunnerTest.java | 4 +- .../sdk/testing/DataflowAssertTest.java | 327 -------- .../cloud/dataflow/sdk/testing/PAssertTest.java | 331 ++++++++ .../testing/TestDataflowPipelineRunnerTest.java | 24 +- .../transforms/ApproximateQuantilesTest.java | 10 +- .../sdk/transforms/ApproximateUniqueTest.java | 12 +- .../dataflow/sdk/transforms/CombineFnsTest.java | 12 +- .../dataflow/sdk/transforms/CombineTest.java | 66 +- .../dataflow/sdk/transforms/CountTest.java | 10 +- .../dataflow/sdk/transforms/CreateTest.java | 18 +- .../dataflow/sdk/transforms/FilterTest.java | 18 +- .../sdk/transforms/FlatMapElementsTest.java | 4 +- .../dataflow/sdk/transforms/FlattenTest.java | 22 +- .../dataflow/sdk/transforms/GroupByKeyTest.java | 8 +- .../cloud/dataflow/sdk/transforms/KeysTest.java | 6 +- .../dataflow/sdk/transforms/KvSwapTest.java | 6 +- .../sdk/transforms/MapElementsTest.java | 6 +- .../dataflow/sdk/transforms/ParDoTest.java | 66 +- .../dataflow/sdk/transforms/PartitionTest.java | 14 +- .../sdk/transforms/RemoveDuplicatesTest.java | 8 +- .../dataflow/sdk/transforms/SampleTest.java | 14 +- .../cloud/dataflow/sdk/transforms/TopTest.java | 32 +- .../dataflow/sdk/transforms/ValuesTest.java | 6 +- .../cloud/dataflow/sdk/transforms/ViewTest.java | 72 +- .../dataflow/sdk/transforms/WithKeysTest.java | 8 +- .../sdk/transforms/WithTimestampsTest.java | 10 +- .../sdk/transforms/join/CoGroupByKeyTest.java | 10 +- .../sdk/transforms/windowing/WindowingTest.java | 14 +- .../cloud/dataflow/sdk/util/ReshuffleTest.java | 14 +- .../sdk/values/PCollectionTupleTest.java | 8 +- .../sdk/transforms/CombineJava8Test.java | 10 +- .../sdk/transforms/FilterJava8Test.java | 10 +- .../transforms/FlatMapElementsJava8Test.java | 6 +- .../sdk/transforms/MapElementsJava8Test.java | 6 +- .../sdk/transforms/PartitionJava8Test.java | 8 +- .../transforms/RemoveDuplicatesJava8Test.java | 4 +- .../sdk/transforms/WithKeysJava8Test.java | 4 +- .../sdk/transforms/WithTimestampsJava8Test.java | 6 +- .../main/resources/archetype-resources/pom.xml | 2 +- .../src/main/java/DebuggingWordCount.java | 12 +- .../src/test/java/WordCountTest.java | 4 +- 90 files changed, 1628 insertions(+), 1625 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/InnerJoinTest.java ---------------------------------------------------------------------- diff --git a/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/InnerJoinTest.java b/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/InnerJoinTest.java index 067ffb6..c479a65 100644 --- a/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/InnerJoinTest.java +++ b/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/InnerJoinTest.java @@ -18,7 +18,7 @@ package com.google.cloud.dataflow.contrib.joinlibrary; import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.values.KV; @@ -67,7 +67,7 @@ public class InnerJoinTest { expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - DataflowAssert.that(output).containsInAnyOrder(expectedResult); + PAssert.that(output).containsInAnyOrder(expectedResult); p.run(); } @@ -88,7 +88,7 @@ public class InnerJoinTest { expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); - DataflowAssert.that(output).containsInAnyOrder(expectedResult); + PAssert.that(output).containsInAnyOrder(expectedResult); p.run(); } @@ -109,7 +109,7 @@ public class InnerJoinTest { expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); - DataflowAssert.that(output).containsInAnyOrder(expectedResult); + PAssert.that(output).containsInAnyOrder(expectedResult); p.run(); } @@ -127,7 +127,7 @@ public class InnerJoinTest { PCollection<KV<String, KV<Long, String>>> output = Join.innerJoin( leftCollection, rightCollection); - DataflowAssert.that(output).containsInAnyOrder(expectedResult); + PAssert.that(output).containsInAnyOrder(expectedResult); p.run(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterLeftJoinTest.java b/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterLeftJoinTest.java index 90e4606..35c8655 100644 --- a/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterLeftJoinTest.java +++ b/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterLeftJoinTest.java @@ -18,7 +18,7 @@ package com.google.cloud.dataflow.contrib.joinlibrary; import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.values.KV; @@ -68,7 +68,7 @@ public class OuterLeftJoinTest { expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - DataflowAssert.that(output).containsInAnyOrder(expectedResult); + PAssert.that(output).containsInAnyOrder(expectedResult); p.run(); } @@ -89,7 +89,7 @@ public class OuterLeftJoinTest { expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); - DataflowAssert.that(output).containsInAnyOrder(expectedResult); + PAssert.that(output).containsInAnyOrder(expectedResult); p.run(); } @@ -110,7 +110,7 @@ public class OuterLeftJoinTest { expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); - DataflowAssert.that(output).containsInAnyOrder(expectedResult); + PAssert.that(output).containsInAnyOrder(expectedResult); p.run(); } @@ -129,7 +129,7 @@ public class OuterLeftJoinTest { leftCollection, rightCollection, ""); expectedResult.add(KV.of("Key2", KV.of(4L, ""))); - DataflowAssert.that(output).containsInAnyOrder(expectedResult); + PAssert.that(output).containsInAnyOrder(expectedResult); p.run(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterRightJoinTest.java ---------------------------------------------------------------------- diff --git a/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterRightJoinTest.java b/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterRightJoinTest.java index fe785d7..9cc6785 100644 --- a/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterRightJoinTest.java +++ b/contrib/join-library/src/test/java/com/google/cloud/dataflow/contrib/joinlibrary/OuterRightJoinTest.java @@ -18,7 +18,7 @@ package com.google.cloud.dataflow.contrib.joinlibrary; import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.values.KV; @@ -68,7 +68,7 @@ public class OuterRightJoinTest { expectedResult.add(KV.of("Key1", KV.of(5L, "foo"))); expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); - DataflowAssert.that(output).containsInAnyOrder(expectedResult); + PAssert.that(output).containsInAnyOrder(expectedResult); p.run(); } @@ -89,7 +89,7 @@ public class OuterRightJoinTest { expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); expectedResult.add(KV.of("Key2", KV.of(4L, "gazonk"))); - DataflowAssert.that(output).containsInAnyOrder(expectedResult); + PAssert.that(output).containsInAnyOrder(expectedResult); p.run(); } @@ -110,7 +110,7 @@ public class OuterRightJoinTest { expectedResult.add(KV.of("Key2", KV.of(4L, "bar"))); expectedResult.add(KV.of("Key2", KV.of(6L, "bar"))); - DataflowAssert.that(output).containsInAnyOrder(expectedResult); + PAssert.that(output).containsInAnyOrder(expectedResult); p.run(); } @@ -129,7 +129,7 @@ public class OuterRightJoinTest { leftCollection, rightCollection, -1L); expectedResult.add(KV.of("Key3", KV.of(-1L, "bar"))); - DataflowAssert.that(output).containsInAnyOrder(expectedResult); + PAssert.that(output).containsInAnyOrder(expectedResult); p.run(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/examples/java/pom.xml ---------------------------------------------------------------------- diff --git a/examples/java/pom.xml b/examples/java/pom.xml index 2e158cb..d63ac23 100644 --- a/examples/java/pom.xml +++ b/examples/java/pom.xml @@ -369,7 +369,7 @@ <version>3.1.0</version> </dependency> - <!-- Hamcrest and JUnit are required dependencies of DataflowAssert, + <!-- Hamcrest and JUnit are required dependencies of PAssert, which is used in the main code of DebuggingWordCount example. --> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/examples/java/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java ---------------------------------------------------------------------- diff --git a/examples/java/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java b/examples/java/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java index 331d7c6..7134bca 100644 --- a/examples/java/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java +++ b/examples/java/src/main/java/com/google/cloud/dataflow/examples/DebuggingWordCount.java @@ -22,7 +22,7 @@ import com.google.cloud.dataflow.sdk.io.TextIO; import com.google.cloud.dataflow.sdk.options.Default; import com.google.cloud.dataflow.sdk.options.Description; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.transforms.Aggregator; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.ParDo; @@ -55,7 +55,7 @@ import java.util.regex.Pattern; * 1. Logging to Cloud Logging * 2. Controlling Dataflow worker log levels * 3. Creating a custom aggregator - * 4. Testing your Pipeline via DataflowAssert + * 4. Testing your Pipeline via PAssert * </pre> * * <p>To execute this pipeline locally, specify general pipeline configuration: @@ -178,13 +178,13 @@ public class DebuggingWordCount { .apply(ParDo.of(new FilterTextFn(options.getFilterPattern()))); /** - * Concept #4: DataflowAssert is a set of convenient PTransforms in the style of + * Concept #4: PAssert is a set of convenient PTransforms in the style of * Hamcrest's collection matchers that can be used when writing Pipeline level tests - * to validate the contents of PCollections. DataflowAssert is best used in unit tests + * to validate the contents of PCollections. PAssert is best used in unit tests * with small data sets but is demonstrated here as a teaching tool. * * <p>Below we verify that the set of filtered words matches our expected counts. Note - * that DataflowAssert does not provide any output and that successful completion of the + * that PAssert does not provide any output and that successful completion of the * Pipeline implies that the expectations were met. Learn more at * https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to test * your Pipeline and see {@link DebuggingWordCountTest} for an example unit test. @@ -192,7 +192,7 @@ public class DebuggingWordCount { List<KV<String, Long>> expectedResults = Arrays.asList( KV.of("Flourish", 3L), KV.of("stomach", 1L)); - DataflowAssert.that(filteredWords).containsInAnyOrder(expectedResults); + PAssert.that(filteredWords).containsInAnyOrder(expectedResults); p.run(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/examples/java/src/test/java/com/google/cloud/dataflow/examples/WordCountTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/WordCountTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/WordCountTest.java index 8121d70..b1f4f27 100644 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/WordCountTest.java +++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/WordCountTest.java @@ -22,7 +22,7 @@ import com.google.cloud.dataflow.examples.WordCount.ExtractWordsFn; import com.google.cloud.dataflow.examples.WordCount.FormatAsTextFn; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.testing.RunnableOnService; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -80,7 +80,7 @@ public class WordCountTest { PCollection<String> output = input.apply(new CountWords()) .apply(MapElements.via(new FormatAsTextFn())); - DataflowAssert.that(output).containsInAnyOrder(COUNTS_ARRAY); + PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY); p.run(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/AutoCompleteTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/AutoCompleteTest.java index d29a039..a70aee1 100644 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/AutoCompleteTest.java +++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/AutoCompleteTest.java @@ -20,7 +20,7 @@ package com.google.cloud.dataflow.examples.complete; import com.google.cloud.dataflow.examples.complete.AutoComplete.CompletionCandidate; import com.google.cloud.dataflow.examples.complete.AutoComplete.ComputeTopCompletions; import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.DoFn; @@ -93,7 +93,7 @@ public class AutoCompleteTest implements Serializable { } })); - DataflowAssert.that(output).containsInAnyOrder( + PAssert.that(output).containsInAnyOrder( KV.of("a", parseList("apple:2", "apricot:1")), KV.of("ap", parseList("apple:2", "apricot:1")), KV.of("b", parseList("blackberry:3", "blueberry:2")), @@ -115,7 +115,7 @@ public class AutoCompleteTest implements Serializable { PCollection<KV<String, List<CompletionCandidate>>> output = input.apply(new ComputeTopCompletions(2, recursive)); - DataflowAssert.that(output).containsInAnyOrder( + PAssert.that(output).containsInAnyOrder( KV.of("x", parseList("x:3", "xy:2")), KV.of("xy", parseList("xy:2", "xyz:1")), KV.of("xyz", parseList("xyz:1"))); @@ -141,7 +141,7 @@ public class AutoCompleteTest implements Serializable { input.apply(Window.<String>into(SlidingWindows.of(new Duration(2)))) .apply(new ComputeTopCompletions(2, recursive)); - DataflowAssert.that(output).containsInAnyOrder( + PAssert.that(output).containsInAnyOrder( // Window [0, 2) KV.of("x", parseList("xA:2", "xB:1")), KV.of("xA", parseList("xA:2")), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TfIdfTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TfIdfTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TfIdfTest.java index 5f991ad..5989ce8 100644 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TfIdfTest.java +++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TfIdfTest.java @@ -19,7 +19,7 @@ package com.google.cloud.dataflow.examples.complete; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.testing.RunnableOnService; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -61,7 +61,7 @@ public class TfIdfTest { .apply(Keys.<String>create()) .apply(RemoveDuplicates.<String>create()); - DataflowAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d")); + PAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d")); pipeline.run(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessionsTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessionsTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessionsTest.java index 872b8e3..52f69b0 100644 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessionsTest.java +++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessionsTest.java @@ -19,7 +19,7 @@ package com.google.cloud.dataflow.examples.complete; import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.testing.RunnableOnService; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -52,7 +52,7 @@ public class TopWikipediaSessionsTest { new TableRow().set("timestamp", 35 * 24 * 3600).set("contributor_username", "user3")))) .apply(new TopWikipediaSessions.ComputeTopSessions(1.0)); - DataflowAssert.that(output).containsInAnyOrder(Arrays.asList( + PAssert.that(output).containsInAnyOrder(Arrays.asList( "user1 : [1970-01-01T00:00:00.000Z..1970-01-01T01:00:02.000Z)" + " : 3 : 1970-01-01T00:00:00.000Z", "user3 : [1970-02-05T00:00:00.000Z..1970-02-05T01:00:00.000Z)" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/DeDupExampleTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/DeDupExampleTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/DeDupExampleTest.java index ee1859f..6e9e3ed 100644 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/DeDupExampleTest.java +++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/DeDupExampleTest.java @@ -19,7 +19,7 @@ package com.google.cloud.dataflow.examples.cookbook; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.testing.RunnableOnService; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -59,7 +59,7 @@ public class DeDupExampleTest { PCollection<String> output = input.apply(RemoveDuplicates.<String>create()); - DataflowAssert.that(output) + PAssert.that(output) .containsInAnyOrder("k1", "k5", "k2", "k3"); p.run(); } @@ -78,7 +78,7 @@ public class DeDupExampleTest { PCollection<String> output = input.apply(RemoveDuplicates.<String>create()); - DataflowAssert.that(output).empty(); + PAssert.that(output).empty(); p.run(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/JoinExamplesTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/JoinExamplesTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/JoinExamplesTest.java index a7a0eda..259ce08 100644 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/JoinExamplesTest.java +++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/JoinExamplesTest.java @@ -21,7 +21,7 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.dataflow.examples.cookbook.JoinExamples.ExtractCountryInfoFn; import com.google.cloud.dataflow.examples.cookbook.JoinExamples.ExtractEventDataFn; import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.testing.RunnableOnService; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -109,7 +109,7 @@ public class JoinExamplesTest { PCollection<TableRow> input2 = p.apply("CreateCC", Create.of(CC_ARRAY)); PCollection<String> output = JoinExamples.joinEvents(input1, input2); - DataflowAssert.that(output).containsInAnyOrder(JOINED_EVENTS); + PAssert.that(output).containsInAnyOrder(JOINED_EVENTS); p.run(); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/TriggerExampleTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/TriggerExampleTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/TriggerExampleTest.java index e3f26ff..3664561 100644 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/TriggerExampleTest.java +++ b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/TriggerExampleTest.java @@ -21,7 +21,7 @@ import com.google.api.services.bigquery.model.TableRow; import com.google.cloud.dataflow.examples.cookbook.TriggerExample.ExtractFlowInfo; import com.google.cloud.dataflow.examples.cookbook.TriggerExample.TotalFlow; import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.testing.RunnableOnService; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -114,7 +114,7 @@ public class TriggerExampleTest { PCollection<TableRow> results = totalFlow.apply(ParDo.of(new FormatResults())); - DataflowAssert.that(results).containsInAnyOrder(OUT_ROW_1, OUT_ROW_2); + PAssert.that(results).containsInAnyOrder(OUT_ROW_1, OUT_ROW_2); pipeline.run(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/examples/java8/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java b/examples/java8/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java index e1a5edb..5832c89 100644 --- a/examples/java8/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java +++ b/examples/java8/src/test/java/com/google/cloud/dataflow/examples/complete/game/GameStatsTest.java @@ -19,7 +19,7 @@ package com.google.cloud.dataflow.examples.complete.game; import com.google.cloud.dataflow.examples.complete.game.GameStats.CalculateSpammyUsers; import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.testing.RunnableOnService; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -69,7 +69,7 @@ public class GameStatsTest implements Serializable { PCollection<KV<String, Integer>> output = input.apply(new CalculateSpammyUsers()); // Check the set of spammers. - DataflowAssert.that(output).containsInAnyOrder(SPAMMERS); + PAssert.that(output).containsInAnyOrder(SPAMMERS); p.run(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/examples/java8/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java b/examples/java8/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java index df9768a..3fd2c57 100644 --- a/examples/java8/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java +++ b/examples/java8/src/test/java/com/google/cloud/dataflow/examples/complete/game/HourlyTeamScoreTest.java @@ -21,7 +21,7 @@ import com.google.cloud.dataflow.examples.complete.game.UserScore.GameActionInfo import com.google.cloud.dataflow.examples.complete.game.UserScore.ParseEventFn; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.testing.RunnableOnService; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -104,7 +104,7 @@ public class HourlyTeamScoreTest implements Serializable { .via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())) .withOutputType(new TypeDescriptor<KV<String, Integer>>() {})); - DataflowAssert.that(output).containsInAnyOrder(FILTERED_EVENTS); + PAssert.that(output).containsInAnyOrder(FILTERED_EVENTS); p.run(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/examples/java8/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java ---------------------------------------------------------------------- diff --git a/examples/java8/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java b/examples/java8/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java index 7f25dae..b907ae7 100644 --- a/examples/java8/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java +++ b/examples/java8/src/test/java/com/google/cloud/dataflow/examples/complete/game/UserScoreTest.java @@ -22,7 +22,7 @@ import com.google.cloud.dataflow.examples.complete.game.UserScore.GameActionInfo import com.google.cloud.dataflow.examples.complete.game.UserScore.ParseEventFn; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.testing.RunnableOnService; import com.google.cloud.dataflow.sdk.testing.TestPipeline; import com.google.cloud.dataflow.sdk.transforms.Create; @@ -110,7 +110,7 @@ public class UserScoreTest implements Serializable { .apply("ExtractUserScore", new ExtractAndSumScore("user")); // Check the user score sums. - DataflowAssert.that(output).containsInAnyOrder(USER_SUMS); + PAssert.that(output).containsInAnyOrder(USER_SUMS); p.run(); } @@ -129,7 +129,7 @@ public class UserScoreTest implements Serializable { .apply("ExtractTeamScore", new ExtractAndSumScore("team")); // Check the team score sums. - DataflowAssert.that(output).containsInAnyOrder(TEAM_SUMS); + PAssert.that(output).containsInAnyOrder(TEAM_SUMS); p.run(); } @@ -148,7 +148,7 @@ public class UserScoreTest implements Serializable { MapElements.via((GameActionInfo gInfo) -> KV.of(gInfo.getUser(), gInfo.getScore())) .withOutputType(new TypeDescriptor<KV<String, Integer>>() {})); - DataflowAssert.that(extract).empty(); + PAssert.that(extract).empty(); p.run(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java index aadda24..86a0cd0 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java @@ -31,7 +31,7 @@ public class FlinkTestPipeline extends Pipeline { /** * Creates and returns a new test pipeline for batch execution. * - * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call + * <p> Use {@link com.google.cloud.dataflow.sdk.testing.PAssert} to add tests, then call * {@link Pipeline#run} to execute the pipeline and check the tests. */ public static FlinkTestPipeline createForBatch() { @@ -41,7 +41,7 @@ public class FlinkTestPipeline extends Pipeline { /** * Creates and returns a new test pipeline for streaming execution. * - * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call + * <p> Use {@link com.google.cloud.dataflow.sdk.testing.PAssert} to add tests, then call * {@link Pipeline#run} to execute the pipeline and check the tests. * * @return The Test Pipeline @@ -53,7 +53,7 @@ public class FlinkTestPipeline extends Pipeline { /** * Creates and returns a new test pipeline for streaming or batch execution. * - * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to add tests, then call + * <p> Use {@link com.google.cloud.dataflow.sdk.testing.PAssert} to add tests, then call * {@link Pipeline#run} to execute the pipeline and check the tests. * * @param streaming <code>True</code> for streaming mode, <code>False</code> for batch. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java index 349bb7c..fdab667 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/streaming/StreamingTransformTranslator.java @@ -136,7 +136,7 @@ public final class StreamingTransformTranslator { // fake create as an input // creates a stream with a single batch containing a single null element // to invoke following transformations once - // to support DataflowAssert + // to support PAssert sec.setDStreamFromQueue(transform, Collections.<Iterable<Void>>singletonList(Collections.singletonList((Void) null)), (Coder<Void>) coder); @@ -195,7 +195,7 @@ public final class StreamingTransformTranslator { .transform(new RDDTransform<>(sec, rddEvaluator, transform))); } else { // if the transformation requires direct access to RDD (not in stream) - // this is used for "fake" transformations like with DataflowAssert + // this is used for "fake" transformations like with PAssert rddEvaluator.evaluate(transform, context); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java index 4a080e8..87f6d4c 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/DeDupTest.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.spark; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -52,7 +52,7 @@ public class DeDupTest { PCollection<String> input = p.apply(Create.of(LINES)).setCoder(StringUtf8Coder.of()); PCollection<String> output = input.apply(RemoveDuplicates.<String>create()); - DataflowAssert.that(output).containsInAnyOrder(EXPECTED_SET); + PAssert.that(output).containsInAnyOrder(EXPECTED_SET); EvaluationResult res = SparkPipelineRunner.create().run(p); res.close(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java index e32b39a..3946e12 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.spark; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.transforms.*; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -50,7 +50,7 @@ public class SimpleWordCountTest { .of()); PCollection<String> output = inputWords.apply(new CountWords()); - DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); + PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); EvaluationResult res = SparkPipelineRunner.create().run(p); res.close(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java index 51ce3a3..24183fb 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/TfIdfTest.java @@ -22,7 +22,7 @@ import com.google.cloud.dataflow.examples.complete.TfIdf; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.Keys; import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; @@ -55,7 +55,7 @@ public class TfIdfTest { .apply(Keys.<String>create()) .apply(RemoveDuplicates.<String>create()); - DataflowAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d")); + PAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d")); EvaluationResult res = SparkPipelineRunner.create().run(pipeline); res.close(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java index a9779e6..5f15a35 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/DoFnOutputTest.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.spark.translation; import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.ParDo; @@ -56,7 +56,7 @@ public class DoFnOutputTest implements Serializable { } })); - DataflowAssert.that(output).containsInAnyOrder("start", "a", "finish"); + PAssert.that(output).containsInAnyOrder("start", "a", "finish"); EvaluationResult res = SparkPipelineRunner.create().run(pipeline); res.close(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java index 974467f..52dcf30 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/MultiOutputWordCountTest.java @@ -22,7 +22,7 @@ import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; import com.google.cloud.dataflow.sdk.runners.AggregatorValues; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.transforms.*; import com.google.cloud.dataflow.sdk.values.*; import com.google.common.collect.ImmutableSet; @@ -61,7 +61,7 @@ public class MultiOutputWordCountTest { ApproximateUnique.<KV<String, Long>>globally(16)); EvaluationResult res = SparkPipelineRunner.create().run(p); - DataflowAssert.that(luc.get(lowerCnts).apply(ParDo.of(new FormatCountsFn()))) + PAssert.that(luc.get(lowerCnts).apply(ParDo.of(new FormatCountsFn()))) .containsInAnyOrder(EXPECTED_LOWER_COUNTS); Iterable<KV<String, Long>> actualUpper = res.get(luc.get(upperCnts)); Assert.assertEquals("Here", actualUpper.iterator().next().getKey()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java index b378795..0978766 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/SerializationTest.java @@ -22,7 +22,7 @@ import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.AtomicCoder; import com.google.cloud.dataflow.sdk.coders.Coder; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.transforms.*; import com.google.cloud.dataflow.sdk.values.KV; import com.google.cloud.dataflow.sdk.values.PCollection; @@ -118,7 +118,7 @@ public class SerializationTest { p.apply(Create.of(WORDS).withCoder(StringHolderUtf8Coder.of())); PCollection<StringHolder> output = inputWords.apply(new CountWords()); - DataflowAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); + PAssert.that(output).containsInAnyOrder(EXPECTED_COUNT_SET); EvaluationResult res = SparkPipelineRunner.create().run(p); res.close(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java index 9fac9c6..9a29171 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/WindowedWordCountTest.java @@ -21,7 +21,7 @@ package org.apache.beam.runners.spark.translation; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.transforms.Create; import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows; @@ -59,7 +59,7 @@ public class WindowedWordCountTest { PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords()); - DataflowAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SEPARATE_COUNT_SET); + PAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SEPARATE_COUNT_SET); EvaluationResult res = SparkPipelineRunner.create().run(p); res.close(); @@ -78,7 +78,7 @@ public class WindowedWordCountTest { PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords()); - DataflowAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SAME_COUNT_SET); + PAssert.that(output).containsInAnyOrder(EXPECTED_FIXED_SAME_COUNT_SET); EvaluationResult res = SparkPipelineRunner.create().run(p); res.close(); @@ -99,7 +99,7 @@ public class WindowedWordCountTest { PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords()); - DataflowAssert.that(output).containsInAnyOrder(EXPECTED_SLIDING_COUNT_SET); + PAssert.that(output).containsInAnyOrder(EXPECTED_SLIDING_COUNT_SET); EvaluationResult res = SparkPipelineRunner.create().run(p); res.close(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java index a3eb301..ed51edf 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/FlattenStreamingTest.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.spark.translation.streaming; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.transforms.Flatten; import com.google.cloud.dataflow.sdk.transforms.View; import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; @@ -31,7 +31,7 @@ import org.apache.beam.runners.spark.SparkStreamingPipelineOptions; import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineRunner; -import org.apache.beam.runners.spark.translation.streaming.utils.DataflowAssertStreaming; +import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; import org.joda.time.Duration; import org.junit.Test; @@ -76,13 +76,13 @@ public class FlattenStreamingTest { PCollectionList<String> list = PCollectionList.of(windowedW1).and(windowedW2); PCollection<String> union = list.apply(Flatten.<String>pCollections()); - DataflowAssert.thatIterable(union.apply(View.<String>asIterable())) + PAssert.thatIterable(union.apply(View.<String>asIterable())) .containsInAnyOrder(EXPECTED_UNION); EvaluationResult res = SparkPipelineRunner.create(options).run(p); res.close(); - DataflowAssertStreaming.assertNoFailures(res); + PAssertStreaming.assertNoFailures(res); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java index 628fe86..4499b56 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.java @@ -20,7 +20,7 @@ package org.apache.beam.runners.spark.translation.streaming; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.KvCoder; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.transforms.DoFn; import com.google.cloud.dataflow.sdk.transforms.ParDo; import com.google.cloud.dataflow.sdk.transforms.View; @@ -35,7 +35,7 @@ import org.apache.beam.runners.spark.SparkStreamingPipelineOptions; import org.apache.beam.runners.spark.io.KafkaIO; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SparkPipelineRunner; -import org.apache.beam.runners.spark.translation.streaming.utils.DataflowAssertStreaming; +import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster; import org.apache.kafka.clients.producer.KafkaProducer; @@ -115,13 +115,13 @@ public class KafkaStreamingTest { PCollection<String> formattedKV = windowedWords.apply(ParDo.of(new FormatKVFn())); - DataflowAssert.thatIterable(formattedKV.apply(View.<String>asIterable())) + PAssert.thatIterable(formattedKV.apply(View.<String>asIterable())) .containsInAnyOrder(EXPECTED); EvaluationResult res = SparkPipelineRunner.create(options).run(p); res.close(); - DataflowAssertStreaming.assertNoFailures(res); + PAssertStreaming.assertNoFailures(res); } @AfterClass http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java index b591510..ed02286 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/SimpleStreamingWordCountTest.java @@ -19,7 +19,7 @@ package org.apache.beam.runners.spark.translation.streaming; import com.google.cloud.dataflow.sdk.Pipeline; import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.testing.DataflowAssert; +import com.google.cloud.dataflow.sdk.testing.PAssert; import com.google.cloud.dataflow.sdk.transforms.View; import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; import com.google.cloud.dataflow.sdk.transforms.windowing.Window; @@ -31,7 +31,7 @@ import org.apache.beam.runners.spark.io.CreateStream; import org.apache.beam.runners.spark.EvaluationResult; import org.apache.beam.runners.spark.SimpleWordCountTest; import org.apache.beam.runners.spark.SparkPipelineRunner; -import org.apache.beam.runners.spark.translation.streaming.utils.DataflowAssertStreaming; +import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming; import org.joda.time.Duration; import org.junit.Test; @@ -66,12 +66,12 @@ public class SimpleStreamingWordCountTest { PCollection<String> output = windowedWords.apply(new SimpleWordCountTest.CountWords()); - DataflowAssert.thatIterable(output.apply(View.<String>asIterable())) + PAssert.thatIterable(output.apply(View.<String>asIterable())) .containsInAnyOrder(EXPECTED_COUNT_SET); EvaluationResult res = SparkPipelineRunner.create(options).run(p); res.close(); - DataflowAssertStreaming.assertNoFailures(res); + PAssertStreaming.assertNoFailures(res); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/DataflowAssertStreaming.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/DataflowAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/DataflowAssertStreaming.java deleted file mode 100644 index 30673dd..0000000 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/DataflowAssertStreaming.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.spark.translation.streaming.utils; - -import org.apache.beam.runners.spark.EvaluationResult; - -import org.junit.Assert; - -/** - * Since DataflowAssert doesn't propagate assert exceptions, use Aggregators to assert streaming - * success/failure counters. - */ -public final class DataflowAssertStreaming { - /** - * Copied aggregator names from {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} - */ - static final String SUCCESS_COUNTER = "DataflowAssertSuccess"; - static final String FAILURE_COUNTER = "DataflowAssertFailure"; - - private DataflowAssertStreaming() { - } - - public static void assertNoFailures(EvaluationResult res) { - int failures = res.getAggregatorValue(FAILURE_COUNTER, Integer.class); - Assert.assertEquals("Found " + failures + " failures, see the log for details", 0, failures); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d73ceab8/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java new file mode 100644 index 0000000..fe0f2a8 --- /dev/null +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/utils/PAssertStreaming.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.translation.streaming.utils; + +import org.apache.beam.runners.spark.EvaluationResult; + +import org.junit.Assert; + +/** + * Since PAssert doesn't propagate assert exceptions, use Aggregators to assert streaming + * success/failure counters. + */ +public final class PAssertStreaming { + /** + * Copied aggregator names from {@link com.google.cloud.dataflow.sdk.testing.PAssert} + */ + static final String SUCCESS_COUNTER = "PAssertSuccess"; + static final String FAILURE_COUNTER = "PAssertFailure"; + + private PAssertStreaming() { + } + + public static void assertNoFailures(EvaluationResult res) { + int failures = res.getAggregatorValue(FAILURE_COUNTER, Integer.class); + Assert.assertEquals("Found " + failures + " failures, see the log for details", 0, failures); + } +}