http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java deleted file mode 100644 index 547f3c3..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.examples.complete.TfIdf; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.StringDelegateCoder; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.Keys; -import org.apache.beam.sdk.transforms.RemoveDuplicates; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import com.google.common.base.Joiner; - -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.net.URI; - - -public class TfIdfITCase extends JavaProgramTestBase { - - protected String resultPath; - - public TfIdfITCase(){ - } - - static final String[] EXPECTED_RESULT = new String[] { - "a", "m", "n", "b", "c", "d"}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline pipeline = FlinkTestPipeline.createForBatch(); - - pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); - - PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline - .apply(Create.of( - KV.of(new URI("x"), "a b c d"), - KV.of(new URI("y"), "a b c"), - KV.of(new URI("z"), "a m n"))) - .apply(new TfIdf.ComputeTfIdf()); - - PCollection<String> words = wordToUriAndTfIdf - .apply(Keys.<String>create()) - .apply(RemoveDuplicates.<String>create()); - - words.apply(TextIO.Write.to(resultPath)); - - pipeline.run(); - } -} -
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java deleted file mode 100644 index 3254e78..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.examples.WordCount; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.values.PCollection; - -import com.google.common.base.Joiner; - -import org.apache.flink.test.util.JavaProgramTestBase; - -import java.util.Arrays; -import java.util.List; - - -public class WordCountITCase extends JavaProgramTestBase { - - protected String resultPath; - - public WordCountITCase(){ - } - - static final String[] WORDS_ARRAY = new String[] { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; - - static final List<String> WORDS = Arrays.asList(WORDS_ARRAY); - - static final String[] COUNTS_ARRAY = new String[] { - "hi: 5", "there: 1", "sue: 2", "bob: 2"}; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(COUNTS_ARRAY), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline p = FlinkTestPipeline.createForBatch(); - - PCollection<String> input = p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of()); - - input - .apply(new WordCount.CountWords()) - .apply(MapElements.via(new WordCount.FormatAsTextFn())) - .apply(TextIO.Write.to(resultPath)); - - p.run(); - } -} - http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java deleted file mode 100644 index 6570e7d..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; - -import com.google.common.base.Joiner; - -import org.apache.flink.test.util.JavaProgramTestBase; - - -public class WordCountJoin2ITCase extends JavaProgramTestBase { - - static final String[] WORDS_1 = new String[] { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; - - static final String[] WORDS_2 = new String[] { - "hi tim", "beauty", "hooray sue bob", - "hi there", "", "please say hi"}; - - static final String[] RESULTS = new String[] { - "beauty -> Tag1: Tag2: 1", - "bob -> Tag1: 2 Tag2: 1", - "hi -> Tag1: 5 Tag2: 3", - "hooray -> Tag1: Tag2: 1", - "please -> Tag1: Tag2: 1", - "say -> Tag1: Tag2: 1", - "sue -> Tag1: 2 Tag2: 1", - "there -> Tag1: 1 Tag2: 1", - "tim -> Tag1: Tag2: 1" - }; - - static final TupleTag<Long> tag1 = new TupleTag<>("Tag1"); - static final TupleTag<Long> tag2 = new TupleTag<>("Tag2"); - - protected String resultPath; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath); - } - - @Override - protected void testProgram() throws Exception { - Pipeline p = FlinkTestPipeline.createForBatch(); - - /* Create two PCollections and join them */ - PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1)) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()); - - PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2)) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()); - - /* CoGroup the two collections */ - PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple - .of(tag1, occurences1) - .and(tag2, occurences2) - .apply(CoGroupByKey.<String>create()); - - /* Format output */ - mergedOccurences.apply(ParDo.of(new FormatCountsFn())) - .apply(TextIO.Write.named("test").to(resultPath)); - - p.run(); - } - - - static class ExtractWordsFn extends DoFn<String, String> { - - @Override - public void startBundle(Context c) { - } - - @Override - public void processElement(ProcessContext c) { - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> { - @Override - public void processElement(ProcessContext c) { - CoGbkResult value = c.element().getValue(); - String key = c.element().getKey(); - String countTag1 = tag1.getId() + ": "; - String countTag2 = tag2.getId() + ": "; - for (Long count : value.getAll(tag1)) { - countTag1 += count + " "; - } - for (Long count : value.getAll(tag2)) { - countTag2 += count; - } - c.output(key + " -> " + countTag1 + countTag2); - } - } - - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java deleted file mode 100644 index 60dc74a..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; - -import com.google.common.base.Joiner; - -import org.apache.flink.test.util.JavaProgramTestBase; - - -public class WordCountJoin3ITCase extends JavaProgramTestBase { - - static final String[] WORDS_1 = new String[] { - "hi there", "hi", "hi sue bob", - "hi sue", "", "bob hi"}; - - static final String[] WORDS_2 = new String[] { - "hi tim", "beauty", "hooray sue bob", - "hi there", "", "please say hi"}; - - static final String[] WORDS_3 = new String[] { - "hi stephan", "beauty", "hooray big fabian", - "hi yo", "", "please say hi"}; - - static final String[] RESULTS = new String[] { - "beauty -> Tag1: Tag2: 1 Tag3: 1", - "bob -> Tag1: 2 Tag2: 1 Tag3: ", - "hi -> Tag1: 5 Tag2: 3 Tag3: 3", - "hooray -> Tag1: Tag2: 1 Tag3: 1", - "please -> Tag1: Tag2: 1 Tag3: 1", - "say -> Tag1: Tag2: 1 Tag3: 1", - "sue -> Tag1: 2 Tag2: 1 Tag3: ", - "there -> Tag1: 1 Tag2: 1 Tag3: ", - "tim -> Tag1: Tag2: 1 Tag3: ", - "stephan -> Tag1: Tag2: Tag3: 1", - "yo -> Tag1: Tag2: Tag3: 1", - "fabian -> Tag1: Tag2: Tag3: 1", - "big -> Tag1: Tag2: Tag3: 1" - }; - - static final TupleTag<Long> tag1 = new TupleTag<>("Tag1"); - static final TupleTag<Long> tag2 = new TupleTag<>("Tag2"); - static final TupleTag<Long> tag3 = new TupleTag<>("Tag3"); - - protected String resultPath; - - @Override - protected void preSubmit() throws Exception { - resultPath = getTempDirPath("result"); - } - - @Override - protected void postSubmit() throws Exception { - compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath); - } - - @Override - protected void testProgram() throws Exception { - - Pipeline p = FlinkTestPipeline.createForBatch(); - - /* Create two PCollections and join them */ - PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1)) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()); - - PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2)) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()); - - PCollection<KV<String,Long>> occurences3 = p.apply(Create.of(WORDS_3)) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Count.<String>perElement()); - - /* CoGroup the two collections */ - PCollection<KV<String, CoGbkResult>> mergedOccurences = KeyedPCollectionTuple - .of(tag1, occurences1) - .and(tag2, occurences2) - .and(tag3, occurences3) - .apply(CoGroupByKey.<String>create()); - - /* Format output */ - mergedOccurences.apply(ParDo.of(new FormatCountsFn())) - .apply(TextIO.Write.named("test").to(resultPath)); - - p.run(); - } - - - static class ExtractWordsFn extends DoFn<String, String> { - - @Override - public void startBundle(Context c) { - } - - @Override - public void processElement(ProcessContext c) { - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> { - @Override - public void processElement(ProcessContext c) { - CoGbkResult value = c.element().getValue(); - String key = c.element().getKey(); - String countTag1 = tag1.getId() + ": "; - String countTag2 = tag2.getId() + ": "; - String countTag3 = tag3.getId() + ": "; - for (Long count : value.getAll(tag1)) { - countTag1 += count + " "; - } - for (Long count : value.getAll(tag2)) { - countTag2 += count + " "; - } - for (Long count : value.getAll(tag3)) { - countTag3 += count; - } - c.output(key + " -> " + countTag1 + countTag2 + countTag3); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java index c76af65..3e5a17d 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java @@ -44,6 +44,7 @@ import org.apache.beam.sdk.values.KV; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.streaming.util.TestHarnessUtil; import org.joda.time.Duration; import org.joda.time.Instant; @@ -53,7 +54,7 @@ import java.util.Collection; import java.util.Comparator; import java.util.concurrent.ConcurrentLinkedQueue; -public class GroupAlsoByWindowTest { +public class GroupAlsoByWindowTest extends StreamingMultipleProgramsTestBase { private final Combine.CombineFn combiner = new Sum.SumIntegerFn(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java deleted file mode 100644 index e6b7f64..0000000 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.util; - -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.BigQueryIO; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.Validation; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; - -import com.google.api.services.bigquery.model.TableRow; - -/** - * Copied from {@link org.apache.beam.examples.JoinExamples} because the code - * is private there. - */ -public class JoinExamples { - - // A 1000-row sample of the GDELT data here: gdelt-bq:full.events. - private static final String GDELT_EVENTS_TABLE = - "clouddataflow-readonly:samples.gdelt_sample"; - // A table that maps country codes to country names. - private static final String COUNTRY_CODES = - "gdelt-bq:full.crosswalk_geocountrycodetohuman"; - - /** - * Join two collections, using country code as the key. - */ - public static PCollection<String> joinEvents(PCollection<TableRow> eventsTable, - PCollection<TableRow> countryCodes) throws Exception { - - final TupleTag<String> eventInfoTag = new TupleTag<>(); - final TupleTag<String> countryInfoTag = new TupleTag<>(); - - // transform both input collections to tuple collections, where the keys are country - // codes in both cases. - PCollection<KV<String, String>> eventInfo = eventsTable.apply( - ParDo.of(new ExtractEventDataFn())); - PCollection<KV<String, String>> countryInfo = countryCodes.apply( - ParDo.of(new ExtractCountryInfoFn())); - - // country code 'key' -> CGBKR (<event info>, <country name>) - PCollection<KV<String, CoGbkResult>> kvpCollection = KeyedPCollectionTuple - .of(eventInfoTag, eventInfo) - .and(countryInfoTag, countryInfo) - .apply(CoGroupByKey.<String>create()); - - // Process the CoGbkResult elements generated by the CoGroupByKey transform. - // country code 'key' -> string of <event info>, <country name> - PCollection<KV<String, String>> finalResultCollection = - kvpCollection.apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() { - @Override - public void processElement(ProcessContext c) { - KV<String, CoGbkResult> e = c.element(); - CoGbkResult val = e.getValue(); - String countryCode = e.getKey(); - String countryName; - countryName = e.getValue().getOnly(countryInfoTag, "Kostas"); - for (String eventInfo : c.element().getValue().getAll(eventInfoTag)) { - // Generate a string that combines information from both collection values - c.output(KV.of(countryCode, "Country name: " + countryName - + ", Event info: " + eventInfo)); - } - } - })); - - // write to GCS - return finalResultCollection - .apply(ParDo.of(new DoFn<KV<String, String>, String>() { - @Override - public void processElement(ProcessContext c) { - String outputstring = "Country code: " + c.element().getKey() - + ", " + c.element().getValue(); - c.output(outputstring); - } - })); - } - - /** - * Examines each row (event) in the input table. Output a KV with the key the country - * code of the event, and the value a string encoding event information. - */ - static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> { - @Override - public void processElement(ProcessContext c) { - TableRow row = c.element(); - String countryCode = (String) row.get("ActionGeo_CountryCode"); - String sqlDate = (String) row.get("SQLDATE"); - String actor1Name = (String) row.get("Actor1Name"); - String sourceUrl = (String) row.get("SOURCEURL"); - String eventInfo = "Date: " + sqlDate + ", Actor1: " + actor1Name + ", url: " + sourceUrl; - c.output(KV.of(countryCode, eventInfo)); - } - } - - - /** - * Examines each row (country info) in the input table. Output a KV with the key the country - * code, and the value the country name. - */ - static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> { - @Override - public void processElement(ProcessContext c) { - TableRow row = c.element(); - String countryCode = (String) row.get("FIPSCC"); - String countryName = (String) row.get("HumanName"); - c.output(KV.of(countryCode, countryName)); - } - } - - - /** - * Options supported by {@link JoinExamples}. - * <p> - * Inherits standard configuration options. - */ - private interface Options extends PipelineOptions { - @Description("Path of the file to write to") - @Validation.Required - String getOutput(); - void setOutput(String value); - } - - public static void main(String[] args) throws Exception { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - Pipeline p = Pipeline.create(options); - // the following two 'applys' create multiple inputs to our pipeline, one for each - // of our two input sources. - PCollection<TableRow> eventsTable = p.apply(BigQueryIO.Read.from(GDELT_EVENTS_TABLE)); - PCollection<TableRow> countryCodes = p.apply(BigQueryIO.Read.from(COUNTRY_CODES)); - PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes); - formattedResults.apply(TextIO.Write.to(options.getOutput())); - p.run(); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bfca23/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java index 2ca7014..29240e7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/UnionCoder.java @@ -35,7 +35,7 @@ import java.util.List; /** * A UnionCoder encodes RawUnionValues. */ -class UnionCoder extends StandardCoder<RawUnionValue> { +public class UnionCoder extends StandardCoder<RawUnionValue> { // TODO: Think about how to integrate this with a schema object (i.e. // a tuple of tuple tags). /**