Repository: beam
Updated Branches:
  refs/heads/DSL_SQL 3e25ffb04 -> be01be5cb


take SerializableFunction as UDF.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ffbd4c2f
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ffbd4c2f
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ffbd4c2f

Branch: refs/heads/DSL_SQL
Commit: ffbd4c2f7ae0a608f061e718aefa23d3349e34a5
Parents: 3e25ffb
Author: mingmxu <ming...@ebay.com>
Authored: Tue Aug 8 23:02:32 2017 -0700
Committer: Tyler Akidau <taki...@apache.org>
Committed: Wed Aug 9 13:27:20 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/sdk/extensions/sql/BeamSql.java    | 17 +++++++++++++++++
 .../apache/beam/sdk/extensions/sql/BeamSqlEnv.java |  9 +++++++++
 .../interpreter/operator/BeamSqlUdfExpression.java |  4 +++-
 .../sdk/extensions/sql/BeamSqlDslUdfUdafTest.java  | 13 ++++++++++++-
 4 files changed, 41 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ffbd4c2f/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
index ac617ad..a1e9877 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java
@@ -26,6 +26,7 @@ import 
org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -144,6 +145,14 @@ public class BeamSql {
        getSqlEnv().registerUdf(functionName, clazz);
        return this;
      }
+     /**
+      * register {@link SerializableFunction} as a UDF function used in this 
query.
+      * Note, {@link SerializableFunction} must have a constructor without 
arguments.
+      */
+      public QueryTransform withUdf(String functionName, SerializableFunction 
sfn){
+        getSqlEnv().registerUdf(functionName, sfn);
+        return this;
+      }
 
      /**
       * register a UDAF function used in this query.
@@ -213,6 +222,14 @@ public class BeamSql {
        getSqlEnv().registerUdf(functionName, clazz);
        return this;
      }
+     /**
+      * register {@link SerializableFunction} as a UDF function used in this 
query.
+      * Note, {@link SerializableFunction} must have a constructor without 
arguments.
+      */
+      public SimpleQueryTransform withUdf(String functionName, 
SerializableFunction sfn){
+        getSqlEnv().registerUdf(functionName, sfn);
+        return this;
+      }
 
      /**
       * register a UDAF function used in this query.

http://git-wip-us.apache.org/repos/asf/beam/blob/ffbd4c2f/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
index 4d21425..0737c49 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java
@@ -24,6 +24,7 @@ import 
org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable;
 import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.calcite.DataContext;
 import org.apache.calcite.linq4j.Enumerable;
 import org.apache.calcite.rel.type.RelDataType;
@@ -60,6 +61,14 @@ public class BeamSqlEnv implements Serializable{
   }
 
   /**
+   * register {@link SerializableFunction} as a UDF function which can be used 
in SQL expression.
+   * Note, {@link SerializableFunction} must have a constructor without 
arguments.
+   */
+  public void registerUdf(String functionName, SerializableFunction sfn) {
+    schema.add(functionName, ScalarFunctionImpl.create(sfn.getClass(), 
"apply"));
+  }
+
+  /**
    * Register a UDAF function which can be used in GROUP-BY expression.
    * See {@link BeamSqlUdaf} on how to implement a UDAF.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/ffbd4c2f/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
index f1bcb66..123e6a0 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java
@@ -30,6 +30,7 @@ import org.apache.calcite.sql.type.SqlTypeName;
 public class BeamSqlUdfExpression extends BeamSqlExpression {
   //as Method is not Serializable, need to keep class/method information, and 
rebuild it.
   private transient Method method;
+  private transient Object udfIns;
   private String className;
   private String methodName;
   private List<String> paraClassName = new ArrayList<>();
@@ -63,7 +64,7 @@ public class BeamSqlUdfExpression extends BeamSqlExpression {
       }
 
       return BeamSqlPrimitive.of(getOutputType(),
-          method.invoke(null, paras.toArray(new Object[]{})));
+          method.invoke(udfIns, paras.toArray(new Object[]{})));
     } catch (Exception ex) {
       throw new RuntimeException(ex);
     }
@@ -78,6 +79,7 @@ public class BeamSqlUdfExpression extends BeamSqlExpression {
       for (String pc : paraClassName) {
         paraClass.add(Class.forName(pc));
       }
+      udfIns = Class.forName(className).newInstance();
       method = Class.forName(className).getMethod(methodName, 
paraClass.toArray(new Class<?>[] {}));
     } catch (Exception e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/beam/blob/ffbd4c2f/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
index 7302376..0552cbf 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java
@@ -24,6 +24,7 @@ import 
org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf;
 import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.values.BeamRecord;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionTuple;
@@ -82,7 +83,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
     PCollection<BeamRecord> result2 =
         PCollectionTuple.of(new TupleTag<BeamRecord>("PCOLLECTION"), 
boundedInput1)
         .apply("testUdf2",
-            BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class));
+            BeamSql.query(sql2).withUdf("cubic2", new CubicIntegerFn()));
     PAssert.that(result2).containsInAnyOrder(record);
 
     pipeline.run().waitUntilFinish();
@@ -131,4 +132,14 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase {
       return input * input * input;
     }
   }
+
+  /**
+   * A example UDF with {@link SerializableFunction}.
+   */
+  public static class CubicIntegerFn implements SerializableFunction<Integer, 
Integer> {
+    @Override
+    public Integer apply(Integer input) {
+      return input * input * input;
+    }
+  }
 }

Reply via email to