Repository: beam
Updated Branches:
  refs/heads/DSL_SQL f59dccc51 -> 3e25ffb04


Update BeamSqlExample:
- Fix mvn example
- Add aggregation/group by


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ee1f97bb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ee1f97bb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ee1f97bb

Branch: refs/heads/DSL_SQL
Commit: ee1f97bb4bf5baa1810a787d0b0e68f314e7c110
Parents: f59dccc
Author: mingmxu <ming...@ebay.com>
Authored: Tue Aug 8 21:52:54 2017 -0700
Committer: Tyler Akidau <taki...@apache.org>
Committed: Wed Aug 9 09:50:37 2017 -0700

----------------------------------------------------------------------
 .../extensions/sql/example/BeamSqlExample.java  | 24 ++++++++++++--------
 1 file changed, 15 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ee1f97bb/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
index 3a46acc..91251cf 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
@@ -39,11 +39,10 @@ import org.apache.beam.sdk.values.TupleTag;
  *
  * <p>Run the example with
  * <pre>
- * mvn -pl dsls/sql compile exec:java \
- *  -Dexec.mainClass=BeamSqlExample \
+ * mvn -pl sdks/java/extensions/sql \
+ *   compile exec:java 
-Dexec.mainClass=org.apache.beam.sdk.extensions.sql.example.BeamSqlExample \
  *   -Dexec.args="--runner=DirectRunner" -Pdirect-runner
  * </pre>
- *
  */
 class BeamSqlExample {
   public static void main(String[] args) throws Exception {
@@ -54,21 +53,26 @@ class BeamSqlExample {
     List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
     List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, 
Types.DOUBLE);
     BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes);
-    BeamRecord row = new BeamRecord(type, 1, "row", 1.0);
+    BeamRecord row1 = new BeamRecord(type, 1, "row", 1.0);
+    BeamRecord row2 = new BeamRecord(type, 2, "row", 2.0);
+    BeamRecord row3 = new BeamRecord(type, 3, "row", 3.0);
 
     //create a source PCollection with Create.of();
-    PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row)
+    PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row1, 
row2, row3)
         .withCoder(type.getRecordCoder()));
 
     //Case 1. run a simple SQL query over input PCollection with 
BeamSql.simpleQuery;
     PCollection<BeamRecord> outputStream = inputTable.apply(
-        BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1=1"));
+        BeamSql.simpleQuery("select c1, c2, c3 from PCOLLECTION where c1 > 
1"));
 
     //print the output record of case 1;
     outputStream.apply("log_result",
         MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, 
Void>() {
       public Void apply(BeamRecord input) {
-        System.out.println("PCOLLECTION: " + input);
+        //expect output:
+        //  PCOLLECTION: [3, row, 3.0]
+        //  PCOLLECTION: [2, row, 2.0]
+        System.out.println("PCOLLECTION: " + input.getDataValues());
         return null;
       }
     }));
@@ -76,14 +80,16 @@ class BeamSqlExample {
     //Case 2. run the query with BeamSql.query over result PCollection of case 
1.
     PCollection<BeamRecord> outputStream2 =
         PCollectionTuple.of(new TupleTag<BeamRecord>("CASE1_RESULT"), 
outputStream)
-        .apply(BeamSql.query("select c2, c3 from CASE1_RESULT where c1=1"));
+        .apply(BeamSql.query("select c2, sum(c3) from CASE1_RESULT group by 
c2"));
 
     //print the output record of case 2;
     outputStream2.apply("log_result",
         MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, 
Void>() {
       @Override
       public Void apply(BeamRecord input) {
-        System.out.println("TABLE_B: " + input);
+        //expect output:
+        //  CASE1_RESULT: [row, 5.0]
+        System.out.println("CASE1_RESULT: " + input.getDataValues());
         return null;
       }
     }));

Reply via email to