Repository: beam Updated Branches: refs/heads/DSL_SQL f59dccc51 -> 3e25ffb04
Update BeamSqlExample: - Fix mvn example - Add aggregation/group by Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee1f97bb Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee1f97bb Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee1f97bb Branch: refs/heads/DSL_SQL Commit: ee1f97bb4bf5baa1810a787d0b0e68f314e7c110 Parents: f59dccc Author: mingmxu <ming...@ebay.com> Authored: Tue Aug 8 21:52:54 2017 -0700 Committer: Tyler Akidau <taki...@apache.org> Committed: Wed Aug 9 09:50:37 2017 -0700 ---------------------------------------------------------------------- .../extensions/sql/example/BeamSqlExample.java | 24 ++++++++++++-------- 1 file changed, 15 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ee1f97bb/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java index 3a46acc..91251cf 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java @@ -39,11 +39,10 @@ import org.apache.beam.sdk.values.TupleTag; * * <p>Run the example with * <pre> - * mvn -pl dsls/sql compile exec:java \ - * -Dexec.mainClass=BeamSqlExample \ + * mvn -pl sdks/java/extensions/sql \ + * compile exec:java -Dexec.mainClass=org.apache.beam.sdk.extensions.sql.example.BeamSqlExample \ * -Dexec.args="--runner=DirectRunner" -Pdirect-runner * </pre> - * */ class BeamSqlExample { public static void main(String[] args) throws Exception { @@ -54,21 +53,26 @@ class BeamSqlExample { List<String> fieldNames = Arrays.asList("c1", "c2", "c3"); List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE); BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes); - BeamRecord row = new BeamRecord(type, 1, "row", 1.0); + BeamRecord row1 = new BeamRecord(type, 1, "row", 1.0); + BeamRecord row2 = new BeamRecord(type, 2, "row", 2.0); + BeamRecord row3 = new BeamRecord(type, 3, "row", 3.0); //create a source PCollection with Create.of(); - PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row) + PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row1, row2, row3) .withCoder(type.getRecordCoder())); //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery; PCollection<BeamRecord> outputStream = inputTable.apply( - BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1")); + BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1 > 1")); //print the output record of case 1; outputStream.apply("log_result", MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, Void>() { public Void apply(BeamRecord input) { - System.out.println("PCOLLECTION: " + input); + //expect output: + // PCOLLECTION: [3, row, 3.0] + // PCOLLECTION: [2, row, 2.0] + System.out.println("PCOLLECTION: " + input.getDataValues()); return null; } })); @@ -76,14 +80,16 @@ class BeamSqlExample { //Case 2. run the query with BeamSql.query over result PCollection of case 1. PCollection<BeamRecord> outputStream2 = PCollectionTuple.of(new TupleTag<BeamRecord>("CASE1_RESULT"), outputStream) - .apply(BeamSql.query("select c2, c3 from CASE1_RESULT where c1=1")); + .apply(BeamSql.query("select c2, sum(c3) from CASE1_RESULT group by c2")); //print the output record of case 2; outputStream2.apply("log_result", MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, Void>() { @Override public Void apply(BeamRecord input) { - System.out.println("TABLE_B: " + input); + //expect output: + // CASE1_RESULT: [row, 5.0] + System.out.println("CASE1_RESULT: " + input.getDataValues()); return null; } }));