http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/com/google/cloud/dataflow/examples/WordCountTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/WordCountTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/WordCountTest.java deleted file mode 100644 index b1f4f27..0000000 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/WordCountTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples; - -import com.google.cloud.dataflow.examples.WordCount.CountWords; -import com.google.cloud.dataflow.examples.WordCount.ExtractWordsFn; -import com.google.cloud.dataflow.examples.WordCount.FormatAsTextFn; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.testing.PAssert; -import com.google.cloud.dataflow.sdk.testing.RunnableOnService; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFnTester; -import com.google.cloud.dataflow.sdk.transforms.MapElements; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.hamcrest.CoreMatchers; -import org.junit.Assert; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Arrays; -import java.util.List; - -/** - * Tests of WordCount. - */ -@RunWith(JUnit4.class) -public class WordCountTest { - - /** Example test that tests a specific DoFn. */ - @Test - public void testExtractWordsFn() { - DoFnTester<String, String> extractWordsFn = - DoFnTester.of(new ExtractWordsFn()); - - Assert.assertThat(extractWordsFn.processBatch(" some input words "), - CoreMatchers.hasItems("some", "input", "words")); - Assert.assertThat(extractWordsFn.processBatch(" "), - CoreMatchers.<String>hasItems()); - Assert.assertThat(extractWordsFn.processBatch(" some ", " input", " words"), - CoreMatchers.hasItems("some", "input", "words")); - } - - static final String[] WORDS_ARRAY = new String[] { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; - - static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); - - static final String[] COUNTS_ARRAY = new String[] { - "hi: 5", "there: 1", "sue: 2", "bob: 2"}; - - /** Example test that tests a PTransform by using an in-memory input and inspecting the output. */ - @Test - @Category(RunnableOnService.class) - public void testCountWords() throws Exception { - Pipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); - - PCollection<String> output = input.apply(new CountWords()) - .apply(MapElements.via(new FormatAsTextFn())); - - PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY); - p.run(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/AutoCompleteTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/AutoCompleteTest.java deleted file mode 100644 index a70aee1..0000000 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/AutoCompleteTest.java +++ /dev/null @@ -1,182 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples.complete; - -import com.google.cloud.dataflow.examples.complete.AutoComplete.CompletionCandidate; -import com.google.cloud.dataflow.examples.complete.AutoComplete.ComputeTopCompletions; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.PAssert; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.Filter; -import com.google.cloud.dataflow.sdk.transforms.PTransform; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; -import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.TimestampedValue; - -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.List; - -/** - * Tests of AutoComplete. - */ -@RunWith(Parameterized.class) -public class AutoCompleteTest implements Serializable { - private boolean recursive; - - public AutoCompleteTest(Boolean recursive) { - this.recursive = recursive; - } - - @Parameterized.Parameters - public static Collection<Object[]> testRecursive() { - return Arrays.asList(new Object[][] { - { true }, - { false } - }); - } - - @Test - public void testAutoComplete() { - List<String> words = Arrays.asList( - "apple", - "apple", - "apricot", - "banana", - "blackberry", - "blackberry", - "blackberry", - "blueberry", - "blueberry", - "cherry"); - - Pipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of(words)); - - PCollection<KV<String, List<CompletionCandidate>>> output = - input.apply(new ComputeTopCompletions(2, recursive)) - .apply(Filter.byPredicate( - new SerializableFunction<KV<String, List<CompletionCandidate>>, Boolean>() { - @Override - public Boolean apply(KV<String, List<CompletionCandidate>> element) { - return element.getKey().length() <= 2; - } - })); - - PAssert.that(output).containsInAnyOrder( - KV.of("a", parseList("apple:2", "apricot:1")), - KV.of("ap", parseList("apple:2", "apricot:1")), - KV.of("b", parseList("blackberry:3", "blueberry:2")), - KV.of("ba", parseList("banana:1")), - KV.of("bl", parseList("blackberry:3", "blueberry:2")), - KV.of("c", parseList("cherry:1")), - KV.of("ch", parseList("cherry:1"))); - p.run(); - } - - @Test - public void testTinyAutoComplete() { - List<String> words = Arrays.asList("x", "x", "x", "xy", "xy", "xyz"); - - Pipeline p = TestPipeline.create(); - - PCollection<String> input = p.apply(Create.of(words)); - - PCollection<KV<String, List<CompletionCandidate>>> output = - input.apply(new ComputeTopCompletions(2, recursive)); - - PAssert.that(output).containsInAnyOrder( - KV.of("x", parseList("x:3", "xy:2")), - KV.of("xy", parseList("xy:2", "xyz:1")), - KV.of("xyz", parseList("xyz:1"))); - p.run(); - } - - @Test - public void testWindowedAutoComplete() { - List<TimestampedValue<String>> words = Arrays.asList( - TimestampedValue.of("xA", new Instant(1)), - TimestampedValue.of("xA", new Instant(1)), - TimestampedValue.of("xB", new Instant(1)), - TimestampedValue.of("xB", new Instant(2)), - TimestampedValue.of("xB", new Instant(2))); - - Pipeline p = TestPipeline.create(); - - PCollection<String> input = p - .apply(Create.of(words)) - .apply(new ReifyTimestamps<String>()); - - PCollection<KV<String, List<CompletionCandidate>>> output = - input.apply(Window.<String>into(SlidingWindows.of(new Duration(2)))) - .apply(new ComputeTopCompletions(2, recursive)); - - PAssert.that(output).containsInAnyOrder( - // Window [0, 2) - KV.of("x", parseList("xA:2", "xB:1")), - KV.of("xA", parseList("xA:2")), - KV.of("xB", parseList("xB:1")), - - // Window [1, 3) - KV.of("x", parseList("xB:3", "xA:2")), - KV.of("xA", parseList("xA:2")), - KV.of("xB", parseList("xB:3")), - - // Window [2, 3) - KV.of("x", parseList("xB:2")), - KV.of("xB", parseList("xB:2"))); - p.run(); - } - - private static List<CompletionCandidate> parseList(String... entries) { - List<CompletionCandidate> all = new ArrayList<>(); - for (String s : entries) { - String[] countValue = s.split(":"); - all.add(new CompletionCandidate(countValue[0], Integer.valueOf(countValue[1]))); - } - return all; - } - - private static class ReifyTimestamps<T> - extends PTransform<PCollection<TimestampedValue<T>>, PCollection<T>> { - @Override - public PCollection<T> apply(PCollection<TimestampedValue<T>> input) { - return input.apply(ParDo.of(new DoFn<TimestampedValue<T>, T>() { - @Override - public void processElement(ProcessContext c) { - c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp()); - } - })); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TfIdfTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TfIdfTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TfIdfTest.java deleted file mode 100644 index 5989ce8..0000000 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TfIdfTest.java +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples.complete; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder; -import com.google.cloud.dataflow.sdk.testing.PAssert; -import com.google.cloud.dataflow.sdk.testing.RunnableOnService; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.Keys; -import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.net.URI; -import java.util.Arrays; - -/** - * Tests of {@link TfIdf}. - */ -@RunWith(JUnit4.class) -public class TfIdfTest { - - /** Test that the example runs. */ - @Test - @Category(RunnableOnService.class) - public void testTfIdf() throws Exception { - Pipeline pipeline = TestPipeline.create(); - - pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); - - PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline - .apply(Create.of( - KV.of(new URI("x"), "a b c d"), - KV.of(new URI("y"), "a b c"), - KV.of(new URI("z"), "a m n"))) - .apply(new TfIdf.ComputeTfIdf()); - - PCollection<String> words = wordToUriAndTfIdf - .apply(Keys.<String>create()) - .apply(RemoveDuplicates.<String>create()); - - PAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d")); - - pipeline.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessionsTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessionsTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessionsTest.java deleted file mode 100644 index 52f69b0..0000000 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/complete/TopWikipediaSessionsTest.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples.complete; - -import com.google.api.services.bigquery.model.TableRow; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.PAssert; -import com.google.cloud.dataflow.sdk.testing.RunnableOnService; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Arrays; - -/** Unit tests for {@link TopWikipediaSessions}. */ -@RunWith(JUnit4.class) -public class TopWikipediaSessionsTest { - @Test - @Category(RunnableOnService.class) - public void testComputeTopUsers() { - Pipeline p = TestPipeline.create(); - - PCollection<String> output = - p.apply(Create.of(Arrays.asList( - new TableRow().set("timestamp", 0).set("contributor_username", "user1"), - new TableRow().set("timestamp", 1).set("contributor_username", "user1"), - new TableRow().set("timestamp", 2).set("contributor_username", "user1"), - new TableRow().set("timestamp", 0).set("contributor_username", "user2"), - new TableRow().set("timestamp", 1).set("contributor_username", "user2"), - new TableRow().set("timestamp", 3601).set("contributor_username", "user2"), - new TableRow().set("timestamp", 3602).set("contributor_username", "user2"), - new TableRow().set("timestamp", 35 * 24 * 3600).set("contributor_username", "user3")))) - .apply(new TopWikipediaSessions.ComputeTopSessions(1.0)); - - PAssert.that(output).containsInAnyOrder(Arrays.asList( - "user1 : [1970-01-01T00:00:00.000Z..1970-01-01T01:00:02.000Z)" - + " : 3 : 1970-01-01T00:00:00.000Z", - "user3 : [1970-02-05T00:00:00.000Z..1970-02-05T01:00:00.000Z)" - + " : 1 : 1970-02-01T00:00:00.000Z")); - - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoesTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoesTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoesTest.java deleted file mode 100644 index aadfa51..0000000 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/BigQueryTornadoesTest.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples.cookbook; - -import com.google.api.services.bigquery.model.TableRow; -import com.google.cloud.dataflow.examples.cookbook.BigQueryTornadoes.ExtractTornadoesFn; -import com.google.cloud.dataflow.examples.cookbook.BigQueryTornadoes.FormatCountsFn; -import com.google.cloud.dataflow.sdk.transforms.DoFnTester; -import com.google.cloud.dataflow.sdk.values.KV; - -import org.hamcrest.CoreMatchers; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.List; - -/** - * Test case for {@link BigQueryTornadoes}. - */ -@RunWith(JUnit4.class) -public class BigQueryTornadoesTest { - - @Test - public void testExtractTornadoes() throws Exception { - TableRow row = new TableRow() - .set("month", "6") - .set("tornado", true); - DoFnTester<TableRow, Integer> extractWordsFn = - DoFnTester.of(new ExtractTornadoesFn()); - Assert.assertThat(extractWordsFn.processBatch(row), - CoreMatchers.hasItems(6)); - } - - @Test - public void testNoTornadoes() throws Exception { - TableRow row = new TableRow() - .set("month", 6) - .set("tornado", false); - DoFnTester<TableRow, Integer> extractWordsFn = - DoFnTester.of(new ExtractTornadoesFn()); - Assert.assertTrue(extractWordsFn.processBatch(row).isEmpty()); - } - - @Test - @SuppressWarnings({"rawtypes", "unchecked"}) - public void testFormatCounts() throws Exception { - DoFnTester<KV<Integer, Long>, TableRow> formatCountsFn = - DoFnTester.of(new FormatCountsFn()); - KV empty[] = {}; - List<TableRow> results = formatCountsFn.processBatch(empty); - Assert.assertTrue(results.size() == 0); - KV input[] = { KV.of(3, 0L), - KV.of(4, Long.MAX_VALUE), - KV.of(5, Long.MIN_VALUE) }; - results = formatCountsFn.processBatch(input); - Assert.assertEquals(results.size(), 3); - Assert.assertEquals(results.get(0).get("month"), 3); - Assert.assertEquals(results.get(0).get("tornado_count"), 0L); - Assert.assertEquals(results.get(1).get("month"), 4); - Assert.assertEquals(results.get(1).get("tornado_count"), Long.MAX_VALUE); - Assert.assertEquals(results.get(2).get("month"), 5); - Assert.assertEquals(results.get(2).get("tornado_count"), Long.MIN_VALUE); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamplesTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamplesTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamplesTest.java deleted file mode 100644 index 56efe76..0000000 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/CombinePerKeyExamplesTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples.cookbook; - -import com.google.api.services.bigquery.model.TableRow; -import com.google.cloud.dataflow.examples.cookbook.CombinePerKeyExamples.ExtractLargeWordsFn; -import com.google.cloud.dataflow.examples.cookbook.CombinePerKeyExamples.FormatShakespeareOutputFn; -import com.google.cloud.dataflow.sdk.transforms.DoFnTester; -import com.google.cloud.dataflow.sdk.values.KV; - -import org.hamcrest.CoreMatchers; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.List; - -/** Unit tests for {@link CombinePerKeyExamples}. */ -@RunWith(JUnit4.class) -public class CombinePerKeyExamplesTest { - - private static final TableRow row1 = new TableRow() - .set("corpus", "king_lear").set("word", "snuffleupaguses"); - private static final TableRow row2 = new TableRow() - .set("corpus", "macbeth").set("word", "antidisestablishmentarianism"); - private static final TableRow row3 = new TableRow() - .set("corpus", "king_lear").set("word", "antidisestablishmentarianism"); - private static final TableRow row4 = new TableRow() - .set("corpus", "macbeth").set("word", "bob"); - private static final TableRow row5 = new TableRow() - .set("corpus", "king_lear").set("word", "hi"); - - static final TableRow[] ROWS_ARRAY = new TableRow[] { - row1, row2, row3, row4, row5 - }; - - private static final KV<String, String> tuple1 = KV.of("snuffleupaguses", "king_lear"); - private static final KV<String, String> tuple2 = KV.of("antidisestablishmentarianism", "macbeth"); - private static final KV<String, String> tuple3 = KV.of("antidisestablishmentarianism", - "king_lear"); - - private static final KV<String, String> combinedTuple1 = KV.of("antidisestablishmentarianism", - "king_lear,macbeth"); - private static final KV<String, String> combinedTuple2 = KV.of("snuffleupaguses", "king_lear"); - - @SuppressWarnings({"unchecked", "rawtypes"}) - static final KV<String, String>[] COMBINED_TUPLES_ARRAY = new KV[] { - combinedTuple1, combinedTuple2 - }; - - private static final TableRow resultRow1 = new TableRow() - .set("word", "snuffleupaguses").set("all_plays", "king_lear"); - private static final TableRow resultRow2 = new TableRow() - .set("word", "antidisestablishmentarianism") - .set("all_plays", "king_lear,macbeth"); - - @Test - public void testExtractLargeWordsFn() { - DoFnTester<TableRow, KV<String, String>> extractLargeWordsFn = - DoFnTester.of(new ExtractLargeWordsFn()); - List<KV<String, String>> results = extractLargeWordsFn.processBatch(ROWS_ARRAY); - Assert.assertThat(results, CoreMatchers.hasItem(tuple1)); - Assert.assertThat(results, CoreMatchers.hasItem(tuple2)); - Assert.assertThat(results, CoreMatchers.hasItem(tuple3)); - } - - @Test - public void testFormatShakespeareOutputFn() { - DoFnTester<KV<String, String>, TableRow> formatShakespeareOutputFn = - DoFnTester.of(new FormatShakespeareOutputFn()); - List<TableRow> results = formatShakespeareOutputFn.processBatch(COMBINED_TUPLES_ARRAY); - Assert.assertThat(results, CoreMatchers.hasItem(resultRow1)); - Assert.assertThat(results, CoreMatchers.hasItem(resultRow2)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/DeDupExampleTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/DeDupExampleTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/DeDupExampleTest.java deleted file mode 100644 index 6e9e3ed..0000000 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/DeDupExampleTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples.cookbook; - -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; -import com.google.cloud.dataflow.sdk.testing.PAssert; -import com.google.cloud.dataflow.sdk.testing.RunnableOnService; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Arrays; -import java.util.List; - -/** Unit tests for {@link DeDupExample}. */ -@RunWith(JUnit4.class) -public class DeDupExampleTest { - - @Test - @Category(RunnableOnService.class) - public void testRemoveDuplicates() { - List<String> strings = Arrays.asList( - "k1", - "k5", - "k5", - "k2", - "k1", - "k2", - "k3"); - - Pipeline p = TestPipeline.create(); - - PCollection<String> input = - p.apply(Create.of(strings) - .withCoder(StringUtf8Coder.of())); - - PCollection<String> output = - input.apply(RemoveDuplicates.<String>create()); - - PAssert.that(output) - .containsInAnyOrder("k1", "k5", "k2", "k3"); - p.run(); - } - - @Test - @Category(RunnableOnService.class) - public void testRemoveDuplicatesEmpty() { - List<String> strings = Arrays.asList(); - - Pipeline p = TestPipeline.create(); - - PCollection<String> input = - p.apply(Create.of(strings) - .withCoder(StringUtf8Coder.of())); - - PCollection<String> output = - input.apply(RemoveDuplicates.<String>create()); - - PAssert.that(output).empty(); - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/FilterExamplesTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/FilterExamplesTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/FilterExamplesTest.java deleted file mode 100644 index 2bd94d5..0000000 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/FilterExamplesTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples.cookbook; - -import com.google.api.services.bigquery.model.TableRow; -import com.google.cloud.dataflow.examples.cookbook.FilterExamples.FilterSingleMonthDataFn; -import com.google.cloud.dataflow.examples.cookbook.FilterExamples.ProjectionFn; -import com.google.cloud.dataflow.sdk.transforms.DoFnTester; - -import org.hamcrest.CoreMatchers; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Arrays; -import java.util.List; - -/** Unit tests for {@link FilterExamples}. */ -@RunWith(JUnit4.class) -public class FilterExamplesTest { - - private static final TableRow row1 = new TableRow() - .set("month", "6").set("day", "21") - .set("year", "2014").set("mean_temp", "85.3") - .set("tornado", true); - private static final TableRow row2 = new TableRow() - .set("month", "7").set("day", "20") - .set("year", "2014").set("mean_temp", "75.4") - .set("tornado", false); - private static final TableRow row3 = new TableRow() - .set("month", "6").set("day", "18") - .set("year", "2014").set("mean_temp", "45.3") - .set("tornado", true); - static final TableRow[] ROWS_ARRAY = new TableRow[] { - row1, row2, row3 - }; - static final List<TableRow> ROWS = Arrays.asList(ROWS_ARRAY); - - private static final TableRow outRow1 = new TableRow() - .set("year", 2014).set("month", 6) - .set("day", 21).set("mean_temp", 85.3); - private static final TableRow outRow2 = new TableRow() - .set("year", 2014).set("month", 7) - .set("day", 20).set("mean_temp", 75.4); - private static final TableRow outRow3 = new TableRow() - .set("year", 2014).set("month", 6) - .set("day", 18).set("mean_temp", 45.3); - private static final TableRow[] PROJROWS_ARRAY = new TableRow[] { - outRow1, outRow2, outRow3 - }; - - - @Test - public void testProjectionFn() { - DoFnTester<TableRow, TableRow> projectionFn = - DoFnTester.of(new ProjectionFn()); - List<TableRow> results = projectionFn.processBatch(ROWS_ARRAY); - Assert.assertThat(results, CoreMatchers.hasItem(outRow1)); - Assert.assertThat(results, CoreMatchers.hasItem(outRow2)); - Assert.assertThat(results, CoreMatchers.hasItem(outRow3)); - } - - @Test - public void testFilterSingleMonthDataFn() { - DoFnTester<TableRow, TableRow> filterSingleMonthDataFn = - DoFnTester.of(new FilterSingleMonthDataFn(7)); - List<TableRow> results = filterSingleMonthDataFn.processBatch(PROJROWS_ARRAY); - Assert.assertThat(results, CoreMatchers.hasItem(outRow2)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/JoinExamplesTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/JoinExamplesTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/JoinExamplesTest.java deleted file mode 100644 index 259ce08..0000000 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/JoinExamplesTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples.cookbook; - -import com.google.api.services.bigquery.model.TableRow; -import com.google.cloud.dataflow.examples.cookbook.JoinExamples.ExtractCountryInfoFn; -import com.google.cloud.dataflow.examples.cookbook.JoinExamples.ExtractEventDataFn; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.PAssert; -import com.google.cloud.dataflow.sdk.testing.RunnableOnService; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFnTester; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; - -import org.hamcrest.CoreMatchers; -import org.junit.Assert; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Arrays; -import java.util.List; - -/** Unit tests for {@link JoinExamples}. */ -@RunWith(JUnit4.class) -public class JoinExamplesTest { - - private static final TableRow row1 = new TableRow() - .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212") - .set("Actor1Name", "BANGKOK").set("SOURCEURL", "http://cnn.com"); - private static final TableRow row2 = new TableRow() - .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212") - .set("Actor1Name", "LAOS").set("SOURCEURL", "http://www.chicagotribune.com"); - private static final TableRow row3 = new TableRow() - .set("ActionGeo_CountryCode", "BE").set("SQLDATE", "20141213") - .set("Actor1Name", "AFGHANISTAN").set("SOURCEURL", "http://cnn.com"); - static final TableRow[] EVENTS = new TableRow[] { - row1, row2, row3 - }; - static final List<TableRow> EVENT_ARRAY = Arrays.asList(EVENTS); - - private static final KV<String, String> kv1 = KV.of("VM", - "Date: 20141212, Actor1: LAOS, url: http://www.chicagotribune.com"); - private static final KV<String, String> kv2 = KV.of("BE", - "Date: 20141213, Actor1: AFGHANISTAN, url: http://cnn.com"); - private static final KV<String, String> kv3 = KV.of("BE", "Belgium"); - private static final KV<String, String> kv4 = KV.of("VM", "Vietnam"); - - private static final TableRow cc1 = new TableRow() - .set("FIPSCC", "VM").set("HumanName", "Vietnam"); - private static final TableRow cc2 = new TableRow() - .set("FIPSCC", "BE").set("HumanName", "Belgium"); - static final TableRow[] CCS = new TableRow[] { - cc1, cc2 - }; - static final List<TableRow> CC_ARRAY = Arrays.asList(CCS); - - static final String[] JOINED_EVENTS = new String[] { - "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: LAOS, " - + "url: http://www.chicagotribune.com", - "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, Actor1: BANGKOK, " - + "url: http://cnn.com", - "Country code: BE, Country name: Belgium, Event info: Date: 20141213, Actor1: AFGHANISTAN, " - + "url: http://cnn.com" - }; - - @Test - public void testExtractEventDataFn() { - DoFnTester<TableRow, KV<String, String>> extractEventDataFn = - DoFnTester.of(new ExtractEventDataFn()); - List<KV<String, String>> results = extractEventDataFn.processBatch(EVENTS); - Assert.assertThat(results, CoreMatchers.hasItem(kv1)); - Assert.assertThat(results, CoreMatchers.hasItem(kv2)); - } - - @Test - public void testExtractCountryInfoFn() { - DoFnTester<TableRow, KV<String, String>> extractCountryInfoFn = - DoFnTester.of(new ExtractCountryInfoFn()); - List<KV<String, String>> results = extractCountryInfoFn.processBatch(CCS); - Assert.assertThat(results, CoreMatchers.hasItem(kv3)); - Assert.assertThat(results, CoreMatchers.hasItem(kv4)); - } - - - @Test - @Category(RunnableOnService.class) - public void testJoin() throws java.lang.Exception { - Pipeline p = TestPipeline.create(); - PCollection<TableRow> input1 = p.apply("CreateEvent", Create.of(EVENT_ARRAY)); - PCollection<TableRow> input2 = p.apply("CreateCC", Create.of(CC_ARRAY)); - - PCollection<String> output = JoinExamples.joinEvents(input1, input2); - PAssert.that(output).containsInAnyOrder(JOINED_EVENTS); - p.run(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamplesTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamplesTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamplesTest.java deleted file mode 100644 index f3ffcea..0000000 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/MaxPerKeyExamplesTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples.cookbook; - -import com.google.api.services.bigquery.model.TableRow; -import com.google.cloud.dataflow.examples.cookbook.MaxPerKeyExamples.ExtractTempFn; -import com.google.cloud.dataflow.examples.cookbook.MaxPerKeyExamples.FormatMaxesFn; -import com.google.cloud.dataflow.sdk.transforms.DoFnTester; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.common.collect.ImmutableList; - -import org.hamcrest.CoreMatchers; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.List; - -/** Unit tests for {@link MaxPerKeyExamples}. */ -@RunWith(JUnit4.class) -public class MaxPerKeyExamplesTest { - - private static final TableRow row1 = new TableRow() - .set("month", "6").set("day", "21") - .set("year", "2014").set("mean_temp", "85.3") - .set("tornado", true); - private static final TableRow row2 = new TableRow() - .set("month", "7").set("day", "20") - .set("year", "2014").set("mean_temp", "75.4") - .set("tornado", false); - private static final TableRow row3 = new TableRow() - .set("month", "6").set("day", "18") - .set("year", "2014").set("mean_temp", "45.3") - .set("tornado", true); - private static final List<TableRow> TEST_ROWS = ImmutableList.of(row1, row2, row3); - - private static final KV<Integer, Double> kv1 = KV.of(6, 85.3); - private static final KV<Integer, Double> kv2 = KV.of(6, 45.3); - private static final KV<Integer, Double> kv3 = KV.of(7, 75.4); - - private static final List<KV<Integer, Double>> TEST_KVS = ImmutableList.of(kv1, kv2, kv3); - - private static final TableRow resultRow1 = new TableRow() - .set("month", 6) - .set("max_mean_temp", 85.3); - private static final TableRow resultRow2 = new TableRow() - .set("month", 7) - .set("max_mean_temp", 75.4); - - - @Test - public void testExtractTempFn() { - DoFnTester<TableRow, KV<Integer, Double>> extractTempFn = - DoFnTester.of(new ExtractTempFn()); - List<KV<Integer, Double>> results = extractTempFn.processBatch(TEST_ROWS); - Assert.assertThat(results, CoreMatchers.hasItem(kv1)); - Assert.assertThat(results, CoreMatchers.hasItem(kv2)); - Assert.assertThat(results, CoreMatchers.hasItem(kv3)); - } - - @Test - public void testFormatMaxesFn() { - DoFnTester<KV<Integer, Double>, TableRow> formatMaxesFnFn = - DoFnTester.of(new FormatMaxesFn()); - List<TableRow> results = formatMaxesFnFn.processBatch(TEST_KVS); - Assert.assertThat(results, CoreMatchers.hasItem(resultRow1)); - Assert.assertThat(results, CoreMatchers.hasItem(resultRow2)); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/TriggerExampleTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/TriggerExampleTest.java b/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/TriggerExampleTest.java deleted file mode 100644 index 3664561..0000000 --- a/examples/java/src/test/java/com/google/cloud/dataflow/examples/cookbook/TriggerExampleTest.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.cloud.dataflow.examples.cookbook; - -import com.google.api.services.bigquery.model.TableRow; -import com.google.cloud.dataflow.examples.cookbook.TriggerExample.ExtractFlowInfo; -import com.google.cloud.dataflow.examples.cookbook.TriggerExample.TotalFlow; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.testing.PAssert; -import com.google.cloud.dataflow.sdk.testing.RunnableOnService; -import com.google.cloud.dataflow.sdk.testing.TestPipeline; -import com.google.cloud.dataflow.sdk.transforms.Create; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.DoFnTester; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows; -import com.google.cloud.dataflow.sdk.transforms.windowing.Window; -import com.google.cloud.dataflow.sdk.values.KV; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.dataflow.sdk.values.TimestampedValue; - -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Assert; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -import java.util.Arrays; -import java.util.List; - -/** - * Unit Tests for {@link TriggerExample}. - * The results generated by triggers are by definition non-deterministic and hence hard to test. - * The unit test does not test all aspects of the example. - */ -@RunWith(JUnit4.class) -public class TriggerExampleTest { - - private static final String[] INPUT = - {"01/01/2010 00:00:00,1108302,94,E,ML,36,100,29,0.0065,66,9,1,0.001,74.8,1,9,3,0.0028,71,1,9," - + "12,0.0099,67.4,1,9,13,0.0121,99.0,1,,,,,0,,,,,0,,,,,0,,,,,0", "01/01/2010 00:00:00," - + "1100333,5,N,FR,9,0,39,,,9,,,,0,,,,,0,,,,,0,,,,,0,,,,,0,,,,,0,,,,,0,,,,"}; - - private static final List<TimestampedValue<String>> TIME_STAMPED_INPUT = Arrays.asList( - TimestampedValue.of("01/01/2010 00:00:00,1108302,5,W,ML,36,100,30,0.0065,66,9,1,0.001," - + "74.8,1,9,3,0.0028,71,1,9,12,0.0099,87.4,1,9,13,0.0121,99.0,1,,,,,0,,,,,0,,,,,0,,," - + ",,0", new Instant(60000)), - TimestampedValue.of("01/01/2010 00:00:00,1108302,110,E,ML,36,100,40,0.0065,66,9,1,0.001," - + "74.8,1,9,3,0.0028,71,1,9,12,0.0099,67.4,1,9,13,0.0121,99.0,1,,,,,0,,,,,0,,,,,0,,," - + ",,0", new Instant(1)), - TimestampedValue.of("01/01/2010 00:00:00,1108302,110,E,ML,36,100,50,0.0065,66,9,1," - + "0.001,74.8,1,9,3,0.0028,71,1,9,12,0.0099,97.4,1,9,13,0.0121,50.0,1,,,,,0,,,,,0" - + ",,,,,0,,,,,0", new Instant(1))); - - private static final TableRow OUT_ROW_1 = new TableRow() - .set("trigger_type", "default") - .set("freeway", "5").set("total_flow", 30) - .set("number_of_records", 1) - .set("isFirst", true).set("isLast", true) - .set("timing", "ON_TIME") - .set("window", "[1970-01-01T00:01:00.000Z..1970-01-01T00:02:00.000Z)"); - - private static final TableRow OUT_ROW_2 = new TableRow() - .set("trigger_type", "default") - .set("freeway", "110").set("total_flow", 90) - .set("number_of_records", 2) - .set("isFirst", true).set("isLast", true) - .set("timing", "ON_TIME") - .set("window", "[1970-01-01T00:00:00.000Z..1970-01-01T00:01:00.000Z)"); - - @Test - public void testExtractTotalFlow() { - DoFnTester<String, KV<String, Integer>> extractFlowInfow = DoFnTester - .of(new ExtractFlowInfo()); - - List<KV<String, Integer>> results = extractFlowInfow.processBatch(INPUT); - Assert.assertEquals(results.size(), 1); - Assert.assertEquals(results.get(0).getKey(), "94"); - Assert.assertEquals(results.get(0).getValue(), new Integer(29)); - - List<KV<String, Integer>> output = extractFlowInfow.processBatch(""); - Assert.assertEquals(output.size(), 0); - } - - @Test - @Category(RunnableOnService.class) - public void testTotalFlow () { - Pipeline pipeline = TestPipeline.create(); - PCollection<KV<String, Integer>> flow = pipeline - .apply(Create.timestamped(TIME_STAMPED_INPUT)) - .apply(ParDo.of(new ExtractFlowInfo())); - - PCollection<TableRow> totalFlow = flow - .apply(Window.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(1)))) - .apply(new TotalFlow("default")); - - PCollection<TableRow> results = totalFlow.apply(ParDo.of(new FormatResults())); - - - PAssert.that(results).containsInAnyOrder(OUT_ROW_1, OUT_ROW_2); - pipeline.run(); - - } - - static class FormatResults extends DoFn<TableRow, TableRow> { - @Override - public void processElement(ProcessContext c) throws Exception { - TableRow element = c.element(); - TableRow row = new TableRow() - .set("trigger_type", element.get("trigger_type")) - .set("freeway", element.get("freeway")) - .set("total_flow", element.get("total_flow")) - .set("number_of_records", element.get("number_of_records")) - .set("isFirst", element.get("isFirst")) - .set("isLast", element.get("isLast")) - .set("timing", element.get("timing")) - .set("window", element.get("window")); - c.output(row); - } - } -} - - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java b/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java new file mode 100644 index 0000000..f9e4a2d --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/DebuggingWordCountTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples; + +import com.google.common.io.Files; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.io.File; +import java.nio.charset.StandardCharsets; + +/** + * Tests for {@link DebuggingWordCount}. + */ +@RunWith(JUnit4.class) +public class DebuggingWordCountTest { + @Rule public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Test + public void testDebuggingWordCount() throws Exception { + File file = tmpFolder.newFile(); + Files.write("stomach secret Flourish message Flourish here Flourish", file, + StandardCharsets.UTF_8); + DebuggingWordCount.main(new String[]{"--inputFile=" + file.getAbsolutePath()}); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java new file mode 100644 index 0000000..b1f4f27 --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples; + +import com.google.cloud.dataflow.examples.WordCount.CountWords; +import com.google.cloud.dataflow.examples.WordCount.ExtractWordsFn; +import com.google.cloud.dataflow.examples.WordCount.FormatAsTextFn; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.testing.PAssert; +import com.google.cloud.dataflow.sdk.testing.RunnableOnService; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFnTester; +import com.google.cloud.dataflow.sdk.transforms.MapElements; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Arrays; +import java.util.List; + +/** + * Tests of WordCount. + */ +@RunWith(JUnit4.class) +public class WordCountTest { + + /** Example test that tests a specific DoFn. */ + @Test + public void testExtractWordsFn() { + DoFnTester<String, String> extractWordsFn = + DoFnTester.of(new ExtractWordsFn()); + + Assert.assertThat(extractWordsFn.processBatch(" some input words "), + CoreMatchers.hasItems("some", "input", "words")); + Assert.assertThat(extractWordsFn.processBatch(" "), + CoreMatchers.<String>hasItems()); + Assert.assertThat(extractWordsFn.processBatch(" some ", " input", " words"), + CoreMatchers.hasItems("some", "input", "words")); + } + + static final String[] WORDS_ARRAY = new String[] { + "hi there", "hi", "hi sue bob", + "hi sue", "", "bob hi"}; + + static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); + + static final String[] COUNTS_ARRAY = new String[] { + "hi: 5", "there: 1", "sue: 2", "bob: 2"}; + + /** Example test that tests a PTransform by using an in-memory input and inspecting the output. */ + @Test + @Category(RunnableOnService.class) + public void testCountWords() throws Exception { + Pipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())); + + PCollection<String> output = input.apply(new CountWords()) + .apply(MapElements.via(new FormatAsTextFn())); + + PAssert.that(output).containsInAnyOrder(COUNTS_ARRAY); + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java new file mode 100644 index 0000000..a70aee1 --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples.complete; + +import com.google.cloud.dataflow.examples.complete.AutoComplete.CompletionCandidate; +import com.google.cloud.dataflow.examples.complete.AutoComplete.ComputeTopCompletions; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.testing.PAssert; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.Filter; +import com.google.cloud.dataflow.sdk.transforms.PTransform; +import com.google.cloud.dataflow.sdk.transforms.ParDo; +import com.google.cloud.dataflow.sdk.transforms.SerializableFunction; +import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows; +import com.google.cloud.dataflow.sdk.transforms.windowing.Window; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; +import com.google.cloud.dataflow.sdk.values.TimestampedValue; + +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +/** + * Tests of AutoComplete. + */ +@RunWith(Parameterized.class) +public class AutoCompleteTest implements Serializable { + private boolean recursive; + + public AutoCompleteTest(Boolean recursive) { + this.recursive = recursive; + } + + @Parameterized.Parameters + public static Collection<Object[]> testRecursive() { + return Arrays.asList(new Object[][] { + { true }, + { false } + }); + } + + @Test + public void testAutoComplete() { + List<String> words = Arrays.asList( + "apple", + "apple", + "apricot", + "banana", + "blackberry", + "blackberry", + "blackberry", + "blueberry", + "blueberry", + "cherry"); + + Pipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of(words)); + + PCollection<KV<String, List<CompletionCandidate>>> output = + input.apply(new ComputeTopCompletions(2, recursive)) + .apply(Filter.byPredicate( + new SerializableFunction<KV<String, List<CompletionCandidate>>, Boolean>() { + @Override + public Boolean apply(KV<String, List<CompletionCandidate>> element) { + return element.getKey().length() <= 2; + } + })); + + PAssert.that(output).containsInAnyOrder( + KV.of("a", parseList("apple:2", "apricot:1")), + KV.of("ap", parseList("apple:2", "apricot:1")), + KV.of("b", parseList("blackberry:3", "blueberry:2")), + KV.of("ba", parseList("banana:1")), + KV.of("bl", parseList("blackberry:3", "blueberry:2")), + KV.of("c", parseList("cherry:1")), + KV.of("ch", parseList("cherry:1"))); + p.run(); + } + + @Test + public void testTinyAutoComplete() { + List<String> words = Arrays.asList("x", "x", "x", "xy", "xy", "xyz"); + + Pipeline p = TestPipeline.create(); + + PCollection<String> input = p.apply(Create.of(words)); + + PCollection<KV<String, List<CompletionCandidate>>> output = + input.apply(new ComputeTopCompletions(2, recursive)); + + PAssert.that(output).containsInAnyOrder( + KV.of("x", parseList("x:3", "xy:2")), + KV.of("xy", parseList("xy:2", "xyz:1")), + KV.of("xyz", parseList("xyz:1"))); + p.run(); + } + + @Test + public void testWindowedAutoComplete() { + List<TimestampedValue<String>> words = Arrays.asList( + TimestampedValue.of("xA", new Instant(1)), + TimestampedValue.of("xA", new Instant(1)), + TimestampedValue.of("xB", new Instant(1)), + TimestampedValue.of("xB", new Instant(2)), + TimestampedValue.of("xB", new Instant(2))); + + Pipeline p = TestPipeline.create(); + + PCollection<String> input = p + .apply(Create.of(words)) + .apply(new ReifyTimestamps<String>()); + + PCollection<KV<String, List<CompletionCandidate>>> output = + input.apply(Window.<String>into(SlidingWindows.of(new Duration(2)))) + .apply(new ComputeTopCompletions(2, recursive)); + + PAssert.that(output).containsInAnyOrder( + // Window [0, 2) + KV.of("x", parseList("xA:2", "xB:1")), + KV.of("xA", parseList("xA:2")), + KV.of("xB", parseList("xB:1")), + + // Window [1, 3) + KV.of("x", parseList("xB:3", "xA:2")), + KV.of("xA", parseList("xA:2")), + KV.of("xB", parseList("xB:3")), + + // Window [2, 3) + KV.of("x", parseList("xB:2")), + KV.of("xB", parseList("xB:2"))); + p.run(); + } + + private static List<CompletionCandidate> parseList(String... entries) { + List<CompletionCandidate> all = new ArrayList<>(); + for (String s : entries) { + String[] countValue = s.split(":"); + all.add(new CompletionCandidate(countValue[0], Integer.valueOf(countValue[1]))); + } + return all; + } + + private static class ReifyTimestamps<T> + extends PTransform<PCollection<TimestampedValue<T>>, PCollection<T>> { + @Override + public PCollection<T> apply(PCollection<TimestampedValue<T>> input) { + return input.apply(ParDo.of(new DoFn<TimestampedValue<T>, T>() { + @Override + public void processElement(ProcessContext c) { + c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp()); + } + })); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java new file mode 100644 index 0000000..5989ce8 --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/complete/TfIdfTest.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples.complete; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.StringDelegateCoder; +import com.google.cloud.dataflow.sdk.testing.PAssert; +import com.google.cloud.dataflow.sdk.testing.RunnableOnService; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.Keys; +import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; +import com.google.cloud.dataflow.sdk.values.KV; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.net.URI; +import java.util.Arrays; + +/** + * Tests of {@link TfIdf}. + */ +@RunWith(JUnit4.class) +public class TfIdfTest { + + /** Test that the example runs. */ + @Test + @Category(RunnableOnService.class) + public void testTfIdf() throws Exception { + Pipeline pipeline = TestPipeline.create(); + + pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); + + PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline + .apply(Create.of( + KV.of(new URI("x"), "a b c d"), + KV.of(new URI("y"), "a b c"), + KV.of(new URI("z"), "a m n"))) + .apply(new TfIdf.ComputeTfIdf()); + + PCollection<String> words = wordToUriAndTfIdf + .apply(Keys.<String>create()) + .apply(RemoveDuplicates.<String>create()); + + PAssert.that(words).containsInAnyOrder(Arrays.asList("a", "m", "n", "b", "c", "d")); + + pipeline.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/org/apache/beam/examples/complete/TopWikipediaSessionsTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/TopWikipediaSessionsTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/TopWikipediaSessionsTest.java new file mode 100644 index 0000000..52f69b0 --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/complete/TopWikipediaSessionsTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples.complete; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.testing.PAssert; +import com.google.cloud.dataflow.sdk.testing.RunnableOnService; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Arrays; + +/** Unit tests for {@link TopWikipediaSessions}. */ +@RunWith(JUnit4.class) +public class TopWikipediaSessionsTest { + @Test + @Category(RunnableOnService.class) + public void testComputeTopUsers() { + Pipeline p = TestPipeline.create(); + + PCollection<String> output = + p.apply(Create.of(Arrays.asList( + new TableRow().set("timestamp", 0).set("contributor_username", "user1"), + new TableRow().set("timestamp", 1).set("contributor_username", "user1"), + new TableRow().set("timestamp", 2).set("contributor_username", "user1"), + new TableRow().set("timestamp", 0).set("contributor_username", "user2"), + new TableRow().set("timestamp", 1).set("contributor_username", "user2"), + new TableRow().set("timestamp", 3601).set("contributor_username", "user2"), + new TableRow().set("timestamp", 3602).set("contributor_username", "user2"), + new TableRow().set("timestamp", 35 * 24 * 3600).set("contributor_username", "user3")))) + .apply(new TopWikipediaSessions.ComputeTopSessions(1.0)); + + PAssert.that(output).containsInAnyOrder(Arrays.asList( + "user1 : [1970-01-01T00:00:00.000Z..1970-01-01T01:00:02.000Z)" + + " : 3 : 1970-01-01T00:00:00.000Z", + "user3 : [1970-02-05T00:00:00.000Z..1970-02-05T01:00:00.000Z)" + + " : 1 : 1970-02-01T00:00:00.000Z")); + + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesTest.java new file mode 100644 index 0000000..aadfa51 --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/BigQueryTornadoesTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples.cookbook; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.cloud.dataflow.examples.cookbook.BigQueryTornadoes.ExtractTornadoesFn; +import com.google.cloud.dataflow.examples.cookbook.BigQueryTornadoes.FormatCountsFn; +import com.google.cloud.dataflow.sdk.transforms.DoFnTester; +import com.google.cloud.dataflow.sdk.values.KV; + +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.List; + +/** + * Test case for {@link BigQueryTornadoes}. + */ +@RunWith(JUnit4.class) +public class BigQueryTornadoesTest { + + @Test + public void testExtractTornadoes() throws Exception { + TableRow row = new TableRow() + .set("month", "6") + .set("tornado", true); + DoFnTester<TableRow, Integer> extractWordsFn = + DoFnTester.of(new ExtractTornadoesFn()); + Assert.assertThat(extractWordsFn.processBatch(row), + CoreMatchers.hasItems(6)); + } + + @Test + public void testNoTornadoes() throws Exception { + TableRow row = new TableRow() + .set("month", 6) + .set("tornado", false); + DoFnTester<TableRow, Integer> extractWordsFn = + DoFnTester.of(new ExtractTornadoesFn()); + Assert.assertTrue(extractWordsFn.processBatch(row).isEmpty()); + } + + @Test + @SuppressWarnings({"rawtypes", "unchecked"}) + public void testFormatCounts() throws Exception { + DoFnTester<KV<Integer, Long>, TableRow> formatCountsFn = + DoFnTester.of(new FormatCountsFn()); + KV empty[] = {}; + List<TableRow> results = formatCountsFn.processBatch(empty); + Assert.assertTrue(results.size() == 0); + KV input[] = { KV.of(3, 0L), + KV.of(4, Long.MAX_VALUE), + KV.of(5, Long.MIN_VALUE) }; + results = formatCountsFn.processBatch(input); + Assert.assertEquals(results.size(), 3); + Assert.assertEquals(results.get(0).get("month"), 3); + Assert.assertEquals(results.get(0).get("tornado_count"), 0L); + Assert.assertEquals(results.get(1).get("month"), 4); + Assert.assertEquals(results.get(1).get("tornado_count"), Long.MAX_VALUE); + Assert.assertEquals(results.get(2).get("month"), 5); + Assert.assertEquals(results.get(2).get("tornado_count"), Long.MIN_VALUE); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/org/apache/beam/examples/cookbook/CombinePerKeyExamplesTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/CombinePerKeyExamplesTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/CombinePerKeyExamplesTest.java new file mode 100644 index 0000000..56efe76 --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/CombinePerKeyExamplesTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples.cookbook; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.cloud.dataflow.examples.cookbook.CombinePerKeyExamples.ExtractLargeWordsFn; +import com.google.cloud.dataflow.examples.cookbook.CombinePerKeyExamples.FormatShakespeareOutputFn; +import com.google.cloud.dataflow.sdk.transforms.DoFnTester; +import com.google.cloud.dataflow.sdk.values.KV; + +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.List; + +/** Unit tests for {@link CombinePerKeyExamples}. */ +@RunWith(JUnit4.class) +public class CombinePerKeyExamplesTest { + + private static final TableRow row1 = new TableRow() + .set("corpus", "king_lear").set("word", "snuffleupaguses"); + private static final TableRow row2 = new TableRow() + .set("corpus", "macbeth").set("word", "antidisestablishmentarianism"); + private static final TableRow row3 = new TableRow() + .set("corpus", "king_lear").set("word", "antidisestablishmentarianism"); + private static final TableRow row4 = new TableRow() + .set("corpus", "macbeth").set("word", "bob"); + private static final TableRow row5 = new TableRow() + .set("corpus", "king_lear").set("word", "hi"); + + static final TableRow[] ROWS_ARRAY = new TableRow[] { + row1, row2, row3, row4, row5 + }; + + private static final KV<String, String> tuple1 = KV.of("snuffleupaguses", "king_lear"); + private static final KV<String, String> tuple2 = KV.of("antidisestablishmentarianism", "macbeth"); + private static final KV<String, String> tuple3 = KV.of("antidisestablishmentarianism", + "king_lear"); + + private static final KV<String, String> combinedTuple1 = KV.of("antidisestablishmentarianism", + "king_lear,macbeth"); + private static final KV<String, String> combinedTuple2 = KV.of("snuffleupaguses", "king_lear"); + + @SuppressWarnings({"unchecked", "rawtypes"}) + static final KV<String, String>[] COMBINED_TUPLES_ARRAY = new KV[] { + combinedTuple1, combinedTuple2 + }; + + private static final TableRow resultRow1 = new TableRow() + .set("word", "snuffleupaguses").set("all_plays", "king_lear"); + private static final TableRow resultRow2 = new TableRow() + .set("word", "antidisestablishmentarianism") + .set("all_plays", "king_lear,macbeth"); + + @Test + public void testExtractLargeWordsFn() { + DoFnTester<TableRow, KV<String, String>> extractLargeWordsFn = + DoFnTester.of(new ExtractLargeWordsFn()); + List<KV<String, String>> results = extractLargeWordsFn.processBatch(ROWS_ARRAY); + Assert.assertThat(results, CoreMatchers.hasItem(tuple1)); + Assert.assertThat(results, CoreMatchers.hasItem(tuple2)); + Assert.assertThat(results, CoreMatchers.hasItem(tuple3)); + } + + @Test + public void testFormatShakespeareOutputFn() { + DoFnTester<KV<String, String>, TableRow> formatShakespeareOutputFn = + DoFnTester.of(new FormatShakespeareOutputFn()); + List<TableRow> results = formatShakespeareOutputFn.processBatch(COMBINED_TUPLES_ARRAY); + Assert.assertThat(results, CoreMatchers.hasItem(resultRow1)); + Assert.assertThat(results, CoreMatchers.hasItem(resultRow2)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/org/apache/beam/examples/cookbook/DeDupExampleTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/DeDupExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/DeDupExampleTest.java new file mode 100644 index 0000000..6e9e3ed --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/DeDupExampleTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples.cookbook; + +import com.google.cloud.dataflow.sdk.Pipeline; +import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; +import com.google.cloud.dataflow.sdk.testing.PAssert; +import com.google.cloud.dataflow.sdk.testing.RunnableOnService; +import com.google.cloud.dataflow.sdk.testing.TestPipeline; +import com.google.cloud.dataflow.sdk.transforms.Create; +import com.google.cloud.dataflow.sdk.transforms.RemoveDuplicates; +import com.google.cloud.dataflow.sdk.values.PCollection; + +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Arrays; +import java.util.List; + +/** Unit tests for {@link DeDupExample}. */ +@RunWith(JUnit4.class) +public class DeDupExampleTest { + + @Test + @Category(RunnableOnService.class) + public void testRemoveDuplicates() { + List<String> strings = Arrays.asList( + "k1", + "k5", + "k5", + "k2", + "k1", + "k2", + "k3"); + + Pipeline p = TestPipeline.create(); + + PCollection<String> input = + p.apply(Create.of(strings) + .withCoder(StringUtf8Coder.of())); + + PCollection<String> output = + input.apply(RemoveDuplicates.<String>create()); + + PAssert.that(output) + .containsInAnyOrder("k1", "k5", "k2", "k3"); + p.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testRemoveDuplicatesEmpty() { + List<String> strings = Arrays.asList(); + + Pipeline p = TestPipeline.create(); + + PCollection<String> input = + p.apply(Create.of(strings) + .withCoder(StringUtf8Coder.of())); + + PCollection<String> output = + input.apply(RemoveDuplicates.<String>create()); + + PAssert.that(output).empty(); + p.run(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0393a791/examples/java/src/test/java/org/apache/beam/examples/cookbook/FilterExamplesTest.java ---------------------------------------------------------------------- diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/FilterExamplesTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/FilterExamplesTest.java new file mode 100644 index 0000000..2bd94d5 --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/FilterExamplesTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.dataflow.examples.cookbook; + +import com.google.api.services.bigquery.model.TableRow; +import com.google.cloud.dataflow.examples.cookbook.FilterExamples.FilterSingleMonthDataFn; +import com.google.cloud.dataflow.examples.cookbook.FilterExamples.ProjectionFn; +import com.google.cloud.dataflow.sdk.transforms.DoFnTester; + +import org.hamcrest.CoreMatchers; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.Arrays; +import java.util.List; + +/** Unit tests for {@link FilterExamples}. */ +@RunWith(JUnit4.class) +public class FilterExamplesTest { + + private static final TableRow row1 = new TableRow() + .set("month", "6").set("day", "21") + .set("year", "2014").set("mean_temp", "85.3") + .set("tornado", true); + private static final TableRow row2 = new TableRow() + .set("month", "7").set("day", "20") + .set("year", "2014").set("mean_temp", "75.4") + .set("tornado", false); + private static final TableRow row3 = new TableRow() + .set("month", "6").set("day", "18") + .set("year", "2014").set("mean_temp", "45.3") + .set("tornado", true); + static final TableRow[] ROWS_ARRAY = new TableRow[] { + row1, row2, row3 + }; + static final List<TableRow> ROWS = Arrays.asList(ROWS_ARRAY); + + private static final TableRow outRow1 = new TableRow() + .set("year", 2014).set("month", 6) + .set("day", 21).set("mean_temp", 85.3); + private static final TableRow outRow2 = new TableRow() + .set("year", 2014).set("month", 7) + .set("day", 20).set("mean_temp", 75.4); + private static final TableRow outRow3 = new TableRow() + .set("year", 2014).set("month", 6) + .set("day", 18).set("mean_temp", 45.3); + private static final TableRow[] PROJROWS_ARRAY = new TableRow[] { + outRow1, outRow2, outRow3 + }; + + + @Test + public void testProjectionFn() { + DoFnTester<TableRow, TableRow> projectionFn = + DoFnTester.of(new ProjectionFn()); + List<TableRow> results = projectionFn.processBatch(ROWS_ARRAY); + Assert.assertThat(results, CoreMatchers.hasItem(outRow1)); + Assert.assertThat(results, CoreMatchers.hasItem(outRow2)); + Assert.assertThat(results, CoreMatchers.hasItem(outRow3)); + } + + @Test + public void testFilterSingleMonthDataFn() { + DoFnTester<TableRow, TableRow> filterSingleMonthDataFn = + DoFnTester.of(new FilterSingleMonthDataFn(7)); + List<TableRow> results = filterSingleMonthDataFn.processBatch(PROJROWS_ARRAY); + Assert.assertThat(results, CoreMatchers.hasItem(outRow2)); + } +}