support UDF
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95cba796 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95cba796 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95cba796 Branch: refs/heads/DSL_SQL Commit: 95cba7960552ebfbb594aef1edb775906aec8a79 Parents: 0e08c87 Author: mingmxu <ming...@ebay.com> Authored: Sat May 13 21:42:05 2017 -0700 Committer: mingmxu <ming...@ebay.com> Committed: Mon May 15 20:40:37 2017 -0700 ---------------------------------------------------------------------- .../dsls/sql/interpreter/BeamSQLFnExecutor.java | 13 ++- .../operator/BeamSqlUdfExpression.java | 86 ++++++++++++++++++++ .../beam/dsls/sql/planner/BeamQueryPlanner.java | 5 +- .../beam/dsls/sql/planner/BeamSqlRunner.java | 12 +++ .../operator/BeamSqlUdfExpressionTest.java | 51 ++++++++++++ .../sql/planner/BeamGroupByExplainTest.java | 11 +++ .../sql/planner/BeamGroupByPipelineTest.java | 12 +++ 7 files changed, 188 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java index 4b7af2a..eb9fedf 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/BeamSQLFnExecutor.java @@ -35,6 +35,7 @@ import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlLessThanExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlNotEqualExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlOrExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlPrimitive; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowEndExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowExpression; import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlWindowStartExpression; @@ -60,7 +61,9 @@ import org.apache.calcite.rex.RexCall; import org.apache.calcite.rex.RexInputRef; import org.apache.calcite.rex.RexLiteral; import org.apache.calcite.rex.RexNode; +import org.apache.calcite.schema.impl.ScalarFunctionImpl; import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.validate.SqlUserDefinedFunction; /** * Executor based on {@link BeamSqlExpression} and {@link BeamSqlPrimitive}. @@ -181,7 +184,15 @@ public class BeamSQLFnExecutor implements BeamSQLExpressionExecutor { case "SESSION_END": return new BeamSqlWindowEndExpression(); default: - throw new BeamSqlUnsupportedException("Operator: " + opName + " not supported yet!"); + //handle UDF + if (((RexCall) rexNode).getOperator() instanceof SqlUserDefinedFunction) { + SqlUserDefinedFunction udf = (SqlUserDefinedFunction) ((RexCall) rexNode).getOperator(); + ScalarFunctionImpl fn = (ScalarFunctionImpl) udf.getFunction(); + return new BeamSqlUdfExpression(fn.method, subExps, + ((RexCall) rexNode).type.getSqlTypeName()); + } else { + throw new BeamSqlUnsupportedException("Operator: " + opName + " not supported yet!"); + } } } else { throw new BeamSqlUnsupportedException( http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java new file mode 100644 index 0000000..d6cf987 --- /dev/null +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpression.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.dsls.sql.schema.BeamSQLRow; +import org.apache.calcite.sql.type.SqlTypeName; + +/** + * revoke a UDF function. + */ +public class BeamSqlUdfExpression extends BeamSqlExpression { + //as Method is not Serializable, need to keep class/method information, and rebuild it. + private transient Method method; + private String className; + private String methodName; + private List<String> paraClassName = new ArrayList<>(); + + public BeamSqlUdfExpression(Method method, List<BeamSqlExpression> subExps, + SqlTypeName sqlTypeName) { + super(subExps, sqlTypeName); + this.method = method; + + this.className = method.getDeclaringClass().getName(); + this.methodName = method.getName(); + for (Class<?> c : method.getParameterTypes()) { + paraClassName.add(c.getName()); + } + } + + @Override + public boolean accept() { + return true; + } + + @Override + public BeamSqlPrimitive evaluate(BeamSQLRow inputRecord) { + if (method == null) { + reConstructMethod(); + } + try { + List<Object> paras = new ArrayList<>(); + for (BeamSqlExpression e : getOperands()) { + paras.add(e.evaluate(inputRecord).getValue()); + } + + return BeamSqlPrimitive.of(getOutputType(), + method.invoke(null, paras.toArray(new Object[]{}))); + } catch (Exception ex) { + throw new RuntimeException(ex); + } + } + + /** + * re-construct method from class/method. + */ + private void reConstructMethod() { + try { + List<Class<?>> paraClass = new ArrayList<>(); + for (String pc : paraClassName) { + paraClass.add(Class.forName(pc)); + } + method = Class.forName(className).getMethod(methodName, paraClass.toArray(new Class<?>[] {})); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java index 29b3f1d..9e41555 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamQueryPlanner.java @@ -48,6 +48,7 @@ import org.apache.calcite.sql.SqlOperatorTable; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.parser.SqlParser; +import org.apache.calcite.sql.util.ChainedSqlOperatorTable; import org.apache.calcite.tools.FrameworkConfig; import org.apache.calcite.tools.Frameworks; import org.apache.calcite.tools.Planner; @@ -83,7 +84,9 @@ public class BeamQueryPlanner { FrameworkConfig config = Frameworks.newConfigBuilder() .parserConfig(SqlParser.configBuilder().setLex(Lex.MYSQL).build()).defaultSchema(schema) .traitDefs(traitDefs).context(Contexts.EMPTY_CONTEXT).ruleSets(BeamRuleSets.getRuleSets()) - .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM).build(); + .costFactory(null).typeSystem(BeamRelDataTypeSystem.BEAM_REL_DATATYPE_SYSTEM) + .operatorTable(new ChainedSqlOperatorTable(sqlOperatorTables)) + .build(); this.planner = Frameworks.getPlanner(config); for (String t : schema.getTableNames()) { http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java index 708c507..95ba5a9 100644 --- a/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java +++ b/dsls/sql/src/main/java/org/apache/beam/dsls/sql/planner/BeamSqlRunner.java @@ -24,6 +24,7 @@ import org.apache.beam.dsls.sql.schema.BaseBeamTable; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.schema.Schema; import org.apache.calcite.schema.SchemaPlus; +import org.apache.calcite.schema.impl.ScalarFunctionImpl; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.tools.Frameworks; import org.apache.calcite.tools.RelConversionException; @@ -65,6 +66,17 @@ public class BeamSqlRunner implements Serializable { } /** + * Add a UDF function. + * + * <p>There're two requirements for function {@code methodName}:<br> + * 1. It must be a STATIC method;<br> + * 2. For a primitive parameter, use its wrapper class and handle NULL properly; + */ + public void addUDFFunction(String functionName, Class<?> className, String methodName){ + schema.add(functionName, ScalarFunctionImpl.create(className, methodName)); + } + + /** * submit as a Beam pipeline. * */ http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java new file mode 100644 index 0000000..71ac523 --- /dev/null +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/interpreter/operator/BeamSqlUdfExpressionTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.dsls.sql.interpreter.operator; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.dsls.sql.interpreter.BeamSQLFnExecutorTestBase; +import org.apache.calcite.sql.type.SqlTypeName; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test for BeamSqlUdfExpression. + */ +public class BeamSqlUdfExpressionTest extends BeamSQLFnExecutorTestBase { + + @Test + public void testUdf() throws NoSuchMethodException, SecurityException { + List<BeamSqlExpression> operands = new ArrayList<>(); + operands.add(BeamSqlPrimitive.of(SqlTypeName.INTEGER, 10)); + + BeamSqlUdfExpression exp = new BeamSqlUdfExpression( + UdfFn.class.getMethod("negative", Integer.class), operands, SqlTypeName.INTEGER); + + Assert.assertEquals(-10, exp.evaluate(record).getValue()); + } + + /** + * UDF example. + */ + public static final class UdfFn { + public static int negative(Integer number) { + return number == null ? 0 : 0 - number; + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java index 566c574..4f2b1ba 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByExplainTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.dsls.sql.planner; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest; import org.junit.Test; /** @@ -90,4 +91,14 @@ public class BeamGroupByExplainTest extends BasePlanner { String plan = runner.explainQuery(sql); } + /** + * Query with UDF. + */ + @Test + public void testUdf() throws Exception { + runner.addUDFFunction("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative"); + String sql = "select site_id, negative(site_id) as nsite_id from ORDER_DETAILS"; + + String plan = runner.explainQuery(sql); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/95cba796/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java ---------------------------------------------------------------------- diff --git a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java index d5f8125..71dcf73 100644 --- a/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java +++ b/dsls/sql/src/test/java/org/apache/beam/dsls/sql/planner/BeamGroupByPipelineTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.dsls.sql.planner; +import org.apache.beam.dsls.sql.interpreter.operator.BeamSqlUdfExpressionTest; import org.apache.beam.sdk.Pipeline; import org.junit.Test; @@ -91,4 +92,15 @@ public class BeamGroupByPipelineTest extends BasePlanner { Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql); } + /** + * Query with UDF. + */ + @Test + public void testUdf() throws Exception { + runner.addUDFFunction("negative", BeamSqlUdfExpressionTest.UdfFn.class, "negative"); + String sql = "select site_id, negative(site_id) as nsite_id from ORDER_DETAILS"; + + Pipeline pipeline = runner.getPlanner().compileBeamPipeline(sql); + } + }