Repository: beam Updated Branches: refs/heads/DSL_SQL 3e25ffb04 -> be01be5cb
take SerializableFunction as UDF. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ffbd4c2f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ffbd4c2f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ffbd4c2f Branch: refs/heads/DSL_SQL Commit: ffbd4c2f7ae0a608f061e718aefa23d3349e34a5 Parents: 3e25ffb Author: mingmxu <ming...@ebay.com> Authored: Tue Aug 8 23:02:32 2017 -0700 Committer: Tyler Akidau <taki...@apache.org> Committed: Wed Aug 9 13:27:20 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/extensions/sql/BeamSql.java | 17 +++++++++++++++++ .../apache/beam/sdk/extensions/sql/BeamSqlEnv.java | 9 +++++++++ .../interpreter/operator/BeamSqlUdfExpression.java | 4 +++- .../sdk/extensions/sql/BeamSqlDslUdfUdafTest.java | 13 ++++++++++++- 4 files changed, 41 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/ffbd4c2f/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java index ac617ad..a1e9877 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSql.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -144,6 +145,14 @@ public class BeamSql { getSqlEnv().registerUdf(functionName, clazz); return this; } + /** + * register {@link SerializableFunction} as a UDF function used in this query. + * Note, {@link SerializableFunction} must have a constructor without arguments. + */ + public QueryTransform withUdf(String functionName, SerializableFunction sfn){ + getSqlEnv().registerUdf(functionName, sfn); + return this; + } /** * register a UDAF function used in this query. @@ -213,6 +222,14 @@ public class BeamSql { getSqlEnv().registerUdf(functionName, clazz); return this; } + /** + * register {@link SerializableFunction} as a UDF function used in this query. + * Note, {@link SerializableFunction} must have a constructor without arguments. + */ + public SimpleQueryTransform withUdf(String functionName, SerializableFunction sfn){ + getSqlEnv().registerUdf(functionName, sfn); + return this; + } /** * register a UDAF function used in this query. http://git-wip-us.apache.org/repos/asf/beam/blob/ffbd4c2f/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java index 4d21425..0737c49 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/BeamSqlEnv.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.extensions.sql.schema.BaseBeamTable; import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.calcite.DataContext; import org.apache.calcite.linq4j.Enumerable; import org.apache.calcite.rel.type.RelDataType; @@ -60,6 +61,14 @@ public class BeamSqlEnv implements Serializable{ } /** + * register {@link SerializableFunction} as a UDF function which can be used in SQL expression. + * Note, {@link SerializableFunction} must have a constructor without arguments. + */ + public void registerUdf(String functionName, SerializableFunction sfn) { + schema.add(functionName, ScalarFunctionImpl.create(sfn.getClass(), "apply")); + } + + /** * Register a UDAF function which can be used in GROUP-BY expression. * See {@link BeamSqlUdaf} on how to implement a UDAF. */ http://git-wip-us.apache.org/repos/asf/beam/blob/ffbd4c2f/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java index f1bcb66..123e6a0 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/interpreter/operator/BeamSqlUdfExpression.java @@ -30,6 +30,7 @@ import org.apache.calcite.sql.type.SqlTypeName; public class BeamSqlUdfExpression extends BeamSqlExpression { //as Method is not Serializable, need to keep class/method information, and rebuild it. private transient Method method; + private transient Object udfIns; private String className; private String methodName; private List<String> paraClassName = new ArrayList<>(); @@ -63,7 +64,7 @@ public class BeamSqlUdfExpression extends BeamSqlExpression { } return BeamSqlPrimitive.of(getOutputType(), - method.invoke(null, paras.toArray(new Object[]{}))); + method.invoke(udfIns, paras.toArray(new Object[]{}))); } catch (Exception ex) { throw new RuntimeException(ex); } @@ -78,6 +79,7 @@ public class BeamSqlUdfExpression extends BeamSqlExpression { for (String pc : paraClassName) { paraClass.add(Class.forName(pc)); } + udfIns = Class.forName(className).newInstance(); method = Class.forName(className).getMethod(methodName, paraClass.toArray(new Class<?>[] {})); } catch (Exception e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/beam/blob/ffbd4c2f/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java index 7302376..0552cbf 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlDslUdfUdafTest.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.extensions.sql.schema.BeamRecordSqlType; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdaf; import org.apache.beam.sdk.extensions.sql.schema.BeamSqlUdf; import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.BeamRecord; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; @@ -82,7 +83,7 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { PCollection<BeamRecord> result2 = PCollectionTuple.of(new TupleTag<BeamRecord>("PCOLLECTION"), boundedInput1) .apply("testUdf2", - BeamSql.query(sql2).withUdf("cubic2", CubicInteger.class)); + BeamSql.query(sql2).withUdf("cubic2", new CubicIntegerFn())); PAssert.that(result2).containsInAnyOrder(record); pipeline.run().waitUntilFinish(); @@ -131,4 +132,14 @@ public class BeamSqlDslUdfUdafTest extends BeamSqlDslBase { return input * input * input; } } + + /** + * A example UDF with {@link SerializableFunction}. + */ + public static class CubicIntegerFn implements SerializableFunction<Integer, Integer> { + @Override + public Integer apply(Integer input) { + return input * input * input; + } + } }