[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397920958 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java ## @@ -129,6 +125,10 @@ public FlinkTypeFactory getTypeFactory() { return typeFactory; } + public FunctionIdentifier getIdentifier() { Review comment: nit: return Optional This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397919433 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingUtils.java ## @@ -52,8 +52,15 @@ * Utilities for bridging {@link FunctionDefinition} with Calcite's representation of functions. */ final class BridgingUtils { + static String createName(FunctionIdentifier identifier, FunctionDefinition definition) { Review comment: nit: add `@Nullable` to all arguments in this util This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397899170 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java ## @@ -110,9 +114,11 @@ public static BridgingSqlFunction of( dataTypeFactory, typeFactory, kind, - identifier, + identifier != null ? createName(identifier) : createInlineFunctionName(definition), Review comment: can we integrate the case distinction in the methods `createName()`, `createSqlIdentifier()`, `createSqlFunctionCategory()` directly? First of all, I'm not big fan of inline if/else and more importantly, we need the same logic for `BridgingSqlAggFunction` which is why the `BridgingUtils` class exists. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397888975 ## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/table/FunctionITCase.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.table; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.api.Expressions.call; + +/** + * Tests for user defined functions in the Table API. + */ +public class FunctionITCase extends AbstractTestBase { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testDataTypeBasedTypeInferenceNotSupported() throws Exception { + thrown.expect(ValidationException.class); + thrown.expectMessage("The new type inference for functions is only supported in the Blink planner."); + + StreamExecutionEnvironment streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecEnvironment); + + Table table = tableEnvironment + .sqlQuery("SELECT * FROM (VALUES (1)) AS TableName(f0)") + .select(call(new SimpleScalarFunction(), $("f0"))); + tableEnvironment.toAppendStream(table, Row.class).print(); + + streamExecEnvironment.execute(); + } + + @Test + public void testDataTypeBasedTypeInferenceNotSupportedInLateralJoin() throws Exception { + thrown.expect(ValidationException.class); + thrown.expectMessage("The new type inference for functions is only supported in the Blink planner."); + + StreamExecutionEnvironment streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecEnvironment); + + Table table = tableEnvironment + .sqlQuery("SELECT * FROM (VALUES ('A,B,C')) AS TableName(f0)") + .joinLateral(call(new SimpleTableFunction(), $("f0")).as("a", "b")) + .select($("a"), $("b")); + tableEnvironment.toAppendStream(table, Row.class).print(); + + streamExecEnvironment.execute(); + } + + /** +* Scalar function that uses new type inference stack. +*/ + public static class SimpleScalarFunction extends ScalarFunction { + public long eval(Integer i) { + return i; + } + } + + /** Review comment: I think the function has too much code for just testing an exception. It is dead code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397888192 ## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java ## @@ -444,6 +450,31 @@ private void testUserDefinedCatalogFunction(TableEnvironment tableEnv, String cr tableEnv.sqlUpdate("drop table t2"); } + @Test + public void testDataTypeBasedTypeInferenceNotSupported() throws Exception { + thrown.expect(ValidationException.class); + thrown.expectMessage("The new type inference for functions is only supported in the Blink planner."); + + StreamExecutionEnvironment streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecEnvironment); + + tableEnvironment.createTemporarySystemFunction("func", SimpleScalarFunction.class); + Table table = tableEnvironment + .sqlQuery("SELECT func(1)"); + tableEnvironment.toAppendStream(table, Row.class).print(); + + streamExecEnvironment.execute(); + } + + /** Review comment: True, but there we really tested different kind of functions. The comment has nothing to do with the new type inference. But I don't have a strong opinion there. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397888192 ## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java ## @@ -444,6 +450,31 @@ private void testUserDefinedCatalogFunction(TableEnvironment tableEnv, String cr tableEnv.sqlUpdate("drop table t2"); } + @Test + public void testDataTypeBasedTypeInferenceNotSupported() throws Exception { + thrown.expect(ValidationException.class); + thrown.expectMessage("The new type inference for functions is only supported in the Blink planner."); + + StreamExecutionEnvironment streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecEnvironment); + + tableEnvironment.createTemporarySystemFunction("func", SimpleScalarFunction.class); + Table table = tableEnvironment + .sqlQuery("SELECT func(1)"); + tableEnvironment.toAppendStream(table, Row.class).print(); + + streamExecEnvironment.execute(); + } + + /** Review comment: True, but there we really tested different kind of functions. The comment has nothing to do with the new type inference. But I don't have a string opinion there. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397884272 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java ## @@ -198,4 +204,48 @@ public static boolean isFunction(Expression expression, BuiltInFunctionDefinitio } return false; } + + /** +* Extracts a {@link FunctionIdentifier} for the given {@link CallExpression}. If the call is an inline funcion +* ({@link CallExpression#getFunctionIdentifier()} returns empty) +* +* it uses {@link BuiltInFunctionDefinition#getName()} ()} for built in functions +* it uses {@link UserDefinedFunction#functionIdentifier()} for user defined functions +* it uses {@link FunctionDefinition#toString()} ()} for any other functions +* +*/ + public static FunctionIdentifier getFunctionIdentifier(CallExpression callExpression) { + if (callExpression.getFunctionIdentifier().isPresent()) { + return callExpression.getFunctionIdentifier().get(); + } else { + return getInlineFunctionIdentifier(callExpression); + } + } + + private static FunctionIdentifier getInlineFunctionIdentifier(CallExpression callExpression) { + FunctionDefinition functionDefinition = callExpression.getFunctionDefinition(); + if (functionDefinition instanceof BuiltInFunctionDefinition) { + return FunctionIdentifier.of(((BuiltInFunctionDefinition) functionDefinition).getName()); Review comment: After an offline discussion, we concluded that all functions must come from the catalog manager and thus have an identifier except for inline functions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397276934 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/UserDefinedFunctionConvertRule.java ## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.expressions.converter; + +import org.apache.flink.table.expressions.ApiExpressionUtils; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.functions.UserDefinedFunction; +import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; + +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * A call expression converter rule that converts calls to user defined functions. + */ +public class UserDefinedFunctionConvertRule implements CallExpressionConvertRule { + @Override + public Optional convert( + CallExpression call, + ConvertContext context) { + if (!(call.getFunctionDefinition() instanceof UserDefinedFunction)) { Review comment: Or can this be a `FunctionDefinitionConvertRule`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397251630 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java ## @@ -471,9 +471,14 @@ public QueryOperation flatMap(Expression tableFunction, QueryOperation child) { throw new ValidationException("Only a table function can be used in the flatMap operator."); } - TypeInformation resultType = ((TableFunctionDefinition) ((UnresolvedCallExpression) resolvedTableFunction) - .getFunctionDefinition()) - .getResultType(); + FunctionDefinition functionDefinition = ((UnresolvedCallExpression) resolvedTableFunction) + .getFunctionDefinition(); + if (!(functionDefinition instanceof TableFunctionDefinition)) { Review comment: please add a follow-up issue in FLINK-13191 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397278876 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java ## @@ -270,36 +273,69 @@ public RelNode visit(SortQueryOperation sort) { } @Override - public RelNode visit(CalculatedQueryOperation calculatedTable) { - DataType resultType = fromLegacyInfoToDataType(calculatedTable.getResultType()); - TableFunction tableFunction = calculatedTable.getTableFunction(); - String[] fieldNames = calculatedTable.getTableSchema().getFieldNames(); - - TypedFlinkTableFunction function = new TypedFlinkTableFunction( - tableFunction, fieldNames, resultType); - + public RelNode visit(CalculatedQueryOperation calculatedTable) { + FunctionDefinition functionDefinition = calculatedTable.getFunctionDefinition(); + List parameters = convertToRexNodes(calculatedTable.getParameters()); FlinkTypeFactory typeFactory = relBuilder.getTypeFactory(); + if (functionDefinition instanceof TableFunctionDefinition) { + return convertLegacyTableFunction( + calculatedTable, + (TableFunctionDefinition) functionDefinition, + parameters, + typeFactory); + } - TableSqlFunction sqlFunction = new TableSqlFunction( - FunctionIdentifier.of(tableFunction.functionIdentifier()), - tableFunction.toString(), - tableFunction, - resultType, + DataTypeFactory dataTypeFactory = relBuilder.getCluster() + .getPlanner() + .getContext() + .unwrap(FlinkContext.class) + .getCatalogManager() + .getDataTypeFactory(); + return relBuilder.functionScan( + BridgingSqlFunction.of( + dataTypeFactory, typeFactory, - function, - scala.Option.empty()); + SqlKind.OTHER_FUNCTION, + calculatedTable.getFunctionIdentifier(), + calculatedTable.getFunctionDefinition(), + calculatedTable.getFunctionDefinition().getTypeInference(dataTypeFactory)), Review comment: Same comment as in `UserDefinedFunctionConverterRule`. I think we should get the inference earlier in the API. Otherwise a wrong implemented UDF fails quite late. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397291804 ## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/table/FunctionITCase.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.table; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.api.Expressions.call; + +/** + * Tests for user defined functions in the Table API. + */ +public class FunctionITCase extends AbstractTestBase { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testDataTypeBasedTypeInferenceNotSupported() throws Exception { + thrown.expect(ValidationException.class); + thrown.expectMessage("The new type inference for functions is only supported in the Blink planner."); + + StreamExecutionEnvironment streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecEnvironment); + + Table table = tableEnvironment + .sqlQuery("SELECT * FROM (VALUES (1)) AS TableName(f0)") + .select(call(new SimpleScalarFunction(), $("f0"))); + tableEnvironment.toAppendStream(table, Row.class).print(); + + streamExecEnvironment.execute(); + } + + @Test + public void testDataTypeBasedTypeInferenceNotSupportedInLateralJoin() throws Exception { + thrown.expect(ValidationException.class); + thrown.expectMessage("The new type inference for functions is only supported in the Blink planner."); + + StreamExecutionEnvironment streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecEnvironment); + + Table table = tableEnvironment + .sqlQuery("SELECT * FROM (VALUES ('A,B,C')) AS TableName(f0)") + .joinLateral(call(new SimpleTableFunction(), $("f0")).as("a", "b")) + .select($("a"), $("b")); + tableEnvironment.toAppendStream(table, Row.class).print(); + + streamExecEnvironment.execute(); + } + + /** Review comment: remove comment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397294036 ## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/table/FunctionITCase.java ## @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.stream.table; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.FunctionHint; +import org.apache.flink.table.api.Table; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.java.StreamTableEnvironment; +import org.apache.flink.table.functions.ScalarFunction; +import org.apache.flink.table.functions.TableFunction; +import org.apache.flink.test.util.AbstractTestBase; +import org.apache.flink.types.Row; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.table.api.Expressions.call; + +/** + * Tests for user defined functions in the Table API. + */ +public class FunctionITCase extends AbstractTestBase { + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testDataTypeBasedTypeInferenceNotSupported() throws Exception { + thrown.expect(ValidationException.class); + thrown.expectMessage("The new type inference for functions is only supported in the Blink planner."); + + StreamExecutionEnvironment streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecEnvironment); + + Table table = tableEnvironment + .sqlQuery("SELECT * FROM (VALUES (1)) AS TableName(f0)") + .select(call(new SimpleScalarFunction(), $("f0"))); + tableEnvironment.toAppendStream(table, Row.class).print(); + + streamExecEnvironment.execute(); + } + + @Test + public void testDataTypeBasedTypeInferenceNotSupportedInLateralJoin() throws Exception { + thrown.expect(ValidationException.class); + thrown.expectMessage("The new type inference for functions is only supported in the Blink planner."); + + StreamExecutionEnvironment streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecEnvironment); + + Table table = tableEnvironment + .sqlQuery("SELECT * FROM (VALUES ('A,B,C')) AS TableName(f0)") + .joinLateral(call(new SimpleTableFunction(), $("f0")).as("a", "b")) + .select($("a"), $("b")); + tableEnvironment.toAppendStream(table, Row.class).print(); + + streamExecEnvironment.execute(); + } + + /** +* Scalar function that uses new type inference stack. +*/ + public static class SimpleScalarFunction extends ScalarFunction { + public long eval(Integer i) { + return i; + } + } + + /** Review comment: remove comment and simplify code? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397255980 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/CalculatedTableFactory.java ## @@ -59,82 +59,101 @@ public QueryOperation create(ResolvedExpression callExpr, String[] leftTableFiel return callExpr.accept(calculatedTableCreator); } - private class FunctionTableCallVisitor extends ResolvedExpressionDefaultVisitor> { - - private String[] leftTableFieldNames; + private static class FunctionTableCallVisitor extends ResolvedExpressionDefaultVisitor { + private List leftTableFieldNames; + private static final String ATOMIC_FIELD_NAME = "f0"; public FunctionTableCallVisitor(String[] leftTableFieldNames) { - this.leftTableFieldNames = leftTableFieldNames; + this.leftTableFieldNames = Arrays.asList(leftTableFieldNames); } @Override - public CalculatedQueryOperation visit(CallExpression call) { + public CalculatedQueryOperation visit(CallExpression call) { FunctionDefinition definition = call.getFunctionDefinition(); if (definition.equals(AS)) { return unwrapFromAlias(call); - } else if (definition instanceof TableFunctionDefinition) { - return createFunctionCall( - (TableFunctionDefinition) definition, - Collections.emptyList(), - call.getResolvedChildren()); - } else { - return defaultMethod(call); } + + return createFunctionCall(call, Collections.emptyList(), call.getResolvedChildren()); } - private CalculatedQueryOperation unwrapFromAlias(CallExpression call) { + private CalculatedQueryOperation unwrapFromAlias(CallExpression call) { List children = call.getChildren(); List aliases = children.subList(1, children.size()) .stream() .map(alias -> ExpressionUtils.extractValue(alias, String.class) .orElseThrow(() -> new ValidationException("Unexpected alias: " + alias))) .collect(toList()); - if (!isFunctionOfKind(children.get(0), TABLE)) { + if (!(children.get(0) instanceof CallExpression)) { throw fail(); } CallExpression tableCall = (CallExpression) children.get(0); - TableFunctionDefinition tableFunctionDefinition = - (TableFunctionDefinition) tableCall.getFunctionDefinition(); - return createFunctionCall(tableFunctionDefinition, aliases, tableCall.getResolvedChildren()); + return createFunctionCall(tableCall, aliases, tableCall.getResolvedChildren()); } - private CalculatedQueryOperation createFunctionCall( - TableFunctionDefinition tableFunctionDefinition, + private CalculatedQueryOperation createFunctionCall( + CallExpression callExpression, List aliases, List parameters) { - TypeInformation resultType = tableFunctionDefinition.getResultType(); - int callArity = resultType.getTotalFields(); - int aliasesSize = aliases.size(); + FunctionDefinition functionDefinition = callExpression.getFunctionDefinition(); + FunctionIdentifier functionIdentifier = ApiExpressionUtils.getFunctionIdentifier(callExpression); + final TableSchema tableSchema = adjustNames( + extractSchema(callExpression.getOutputDataType()), + aliases, + functionIdentifier); + + return new CalculatedQueryOperation( + functionDefinition, + functionIdentifier, + parameters, + tableSchema); + } - String[] fieldNames; + private TableSchema extractSchema(DataType resultType) { +
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397253296 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/CalculatedTableFactory.java ## @@ -59,82 +59,101 @@ public QueryOperation create(ResolvedExpression callExpr, String[] leftTableFiel return callExpr.accept(calculatedTableCreator); } - private class FunctionTableCallVisitor extends ResolvedExpressionDefaultVisitor> { - - private String[] leftTableFieldNames; + private static class FunctionTableCallVisitor extends ResolvedExpressionDefaultVisitor { + private List leftTableFieldNames; + private static final String ATOMIC_FIELD_NAME = "f0"; public FunctionTableCallVisitor(String[] leftTableFieldNames) { - this.leftTableFieldNames = leftTableFieldNames; + this.leftTableFieldNames = Arrays.asList(leftTableFieldNames); } @Override - public CalculatedQueryOperation visit(CallExpression call) { + public CalculatedQueryOperation visit(CallExpression call) { FunctionDefinition definition = call.getFunctionDefinition(); if (definition.equals(AS)) { return unwrapFromAlias(call); - } else if (definition instanceof TableFunctionDefinition) { - return createFunctionCall( - (TableFunctionDefinition) definition, - Collections.emptyList(), - call.getResolvedChildren()); - } else { - return defaultMethod(call); } + + return createFunctionCall(call, Collections.emptyList(), call.getResolvedChildren()); } - private CalculatedQueryOperation unwrapFromAlias(CallExpression call) { + private CalculatedQueryOperation unwrapFromAlias(CallExpression call) { List children = call.getChildren(); List aliases = children.subList(1, children.size()) .stream() .map(alias -> ExpressionUtils.extractValue(alias, String.class) .orElseThrow(() -> new ValidationException("Unexpected alias: " + alias))) .collect(toList()); - if (!isFunctionOfKind(children.get(0), TABLE)) { + if (!(children.get(0) instanceof CallExpression)) { Review comment: just to make sure: we are failing now in the code gen, right? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397250681 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CalculatedQueryOperation.java ## @@ -86,4 +87,5 @@ public String asSummaryString() { public U accept(QueryOperationVisitor visitor) { return visitor.visit(this); } + Review comment: nit: unnecessary This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397273112 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/UserDefinedFunctionConvertRule.java ## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.expressions.converter; + +import org.apache.flink.table.expressions.ApiExpressionUtils; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.functions.UserDefinedFunction; +import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; + +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * A call expression converter rule that converts calls to user defined functions. + */ +public class UserDefinedFunctionConvertRule implements CallExpressionConvertRule { + @Override + public Optional convert( + CallExpression call, + ConvertContext context) { + if (!(call.getFunctionDefinition() instanceof UserDefinedFunction)) { Review comment: Is it necessary to limit this code to `UserDefinedFunctions`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397250326 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CalculatedQueryOperation.java ## @@ -33,34 +34,34 @@ * Describes a relational operation that was created from applying a {@link TableFunction}. */ @Internal -public class CalculatedQueryOperation implements QueryOperation { +public class CalculatedQueryOperation implements QueryOperation { - private final TableFunction tableFunction; + private final FunctionDefinition functionDefinition; + private final FunctionIdentifier functionIdentifier; private final List parameters; - private final TypeInformation resultType; private final TableSchema tableSchema; public CalculatedQueryOperation( - TableFunction tableFunction, + FunctionDefinition functionDefinition, + FunctionIdentifier functionIdentifier, List parameters, - TypeInformation resultType, TableSchema tableSchema) { - this.tableFunction = tableFunction; + this.functionDefinition = functionDefinition; + this.functionIdentifier = functionIdentifier; this.parameters = parameters; - this.resultType = resultType; this.tableSchema = tableSchema; } - public TableFunction getTableFunction() { - return tableFunction; + public FunctionDefinition getFunctionDefinition() { + return functionDefinition; } - public List getParameters() { - return parameters; + public FunctionIdentifier getFunctionIdentifier() { + return functionIdentifier; } - public TypeInformation getResultType() { - return resultType; + public List getParameters() { Review comment: can we call it "arguments" everywhere consistently? also in the `call(..., params)` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397280834 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala ## @@ -65,9 +68,11 @@ object CorrelateCodeGenerator { // according to the SQL standard, every scalar function should also be a table function // but we don't allow that for now -if (!rexCall.getOperator.isInstanceOf[BridgingSqlFunction] && -!rexCall.getOperator.isInstanceOf[TableSqlFunction]) { - throw new ValidationException("Currently, only table functions can emit rows.") +rexCall.getOperator match { + case func: BridgingSqlFunction if func.getDefinition.getKind == FunctionKind.TABLE => //ok Review comment: nit: whitespace could be moved into a hotfix commit and merged immediately This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397236012 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java ## @@ -515,6 +516,14 @@ public static ApiExpression call(String path, Object... params) { Arrays.stream(params).map(ApiExpressionUtils::objectToExpression).toArray(Expression[]::new))); } + /** +* A call to an unregistered, inline function. For functions that have been registered before and Review comment: very nit: new line between the two sentences This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397266524 ## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java ## @@ -157,20 +161,79 @@ DataTypes.BOOLEAN() )), - TestSpec.test("Lookup system function call") + TestSpec.test("Lookup legacy scalar function call") .inputSchemas( TableSchema.builder() .field("f0", DataTypes.INT()) .build() ) - .lookupFunction("func", new ScalarFunctionDefinition("func", new ScalarFunc())) + .lookupFunction("func", new ScalarFunctionDefinition("func", new LegacyScalarFunc())) .select(call("func", 1, $("f0"))) .equalTo(new CallExpression( FunctionIdentifier.of("func"), - new ScalarFunctionDefinition("func", new ScalarFunc()), + new ScalarFunctionDefinition("func", new LegacyScalarFunc()), Arrays.asList(valueLiteral(1), new FieldReferenceExpression("f0", DataTypes.INT(), 0, 0)), DataTypes.INT().bridgedTo(Integer.class) - ))); + )), + + TestSpec.test("Lookup system function call") + .inputSchemas( + TableSchema.builder() + .field("f0", DataTypes.INT()) + .build() + ) + .lookupFunction("func", new ScalarFunc()) + .select(call("func", 1, $("f0"))) + .equalTo(new CallExpression( + FunctionIdentifier.of("func"), + new ScalarFunc(), + Arrays.asList(valueLiteral(1), new FieldReferenceExpression("f0", DataTypes.INT(), 0, 0)), + DataTypes.INT().notNull().bridgedTo(int.class) + )), + + TestSpec.test("Lookup catalog function call") + .inputSchemas( + TableSchema.builder() + .field("f0", DataTypes.INT()) + .build() + ) + .lookupFunction(ObjectIdentifier.of("cat", "db", "func"), new ScalarFunc()) + .select(call("cat.db.func", 1, $("f0"))) + .equalTo(new CallExpression( Review comment: nit: a new line for every `new Call`? like below? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397249046 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java ## @@ -198,4 +204,48 @@ public static boolean isFunction(Expression expression, BuiltInFunctionDefinitio } return false; } + + /** +* Extracts a {@link FunctionIdentifier} for the given {@link CallExpression}. If the call is an inline funcion +* ({@link CallExpression#getFunctionIdentifier()} returns empty) +* +* it uses {@link BuiltInFunctionDefinition#getName()} ()} for built in functions +* it uses {@link UserDefinedFunction#functionIdentifier()} for user defined functions +* it uses {@link FunctionDefinition#toString()} ()} for any other functions +* +*/ + public static FunctionIdentifier getFunctionIdentifier(CallExpression callExpression) { + if (callExpression.getFunctionIdentifier().isPresent()) { + return callExpression.getFunctionIdentifier().get(); + } else { + return getInlineFunctionIdentifier(callExpression); + } + } + + private static FunctionIdentifier getInlineFunctionIdentifier(CallExpression callExpression) { + FunctionDefinition functionDefinition = callExpression.getFunctionDefinition(); + if (functionDefinition instanceof BuiltInFunctionDefinition) { + return FunctionIdentifier.of(((BuiltInFunctionDefinition) functionDefinition).getName()); Review comment: should we merge this logic into `BuiltInFunctionDefinition`? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397242372 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java ## @@ -198,4 +204,48 @@ public static boolean isFunction(Expression expression, BuiltInFunctionDefinitio } return false; } + + /** +* Extracts a {@link FunctionIdentifier} for the given {@link CallExpression}. If the call is an inline funcion +* ({@link CallExpression#getFunctionIdentifier()} returns empty) +* +* it uses {@link BuiltInFunctionDefinition#getName()} ()} for built in functions +* it uses {@link UserDefinedFunction#functionIdentifier()} for user defined functions +* it uses {@link FunctionDefinition#toString()} ()} for any other functions Review comment: This case should not be supported. Either we are dealing with built-ins or user-defined functions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397271315 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java ## @@ -71,18 +72,26 @@ public class ExpressionConverter implements ExpressionVisitor { private static final List FUNCTION_CONVERT_CHAIN = Arrays.asList( - new ScalarFunctionConvertRule(), + new LegacyScalarFunctionConvertRule(), + new UserDefinedFunctionConvertRule(), new OverConvertRule(), new DirectConvertRule(), new CustomizedConvertRule() ); private final RelBuilder relBuilder; private final FlinkTypeFactory typeFactory; + private final DataTypeFactory dataTypeFactory; public ExpressionConverter(RelBuilder relBuilder) { this.relBuilder = relBuilder; this.typeFactory = (FlinkTypeFactory) relBuilder.getRexBuilder().getTypeFactory(); + this.dataTypeFactory = relBuilder.getCluster() Review comment: Use `org.apache.flink.table.planner.utils.ShortcutUtils` for readability. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397267739 ## File path: flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java ## @@ -186,13 +249,37 @@ public void testResolvingExpressions() { } /** -* Test scalar function that uses legacy type inference logic. +* Test scalar function. */ + @FunctionHint( + input = @DataTypeHint(inputGroup = InputGroup.ANY), + isVarArgs = true, + output = @DataTypeHint(value = "INTEGER NOT NULL", + bridgedTo = int.class)) Review comment: nit: fix formatting This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397276251 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/UserDefinedFunctionConvertRule.java ## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.expressions.converter; + +import org.apache.flink.table.expressions.ApiExpressionUtils; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.functions.UserDefinedFunction; +import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; + +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * A call expression converter rule that converts calls to user defined functions. + */ +public class UserDefinedFunctionConvertRule implements CallExpressionConvertRule { + @Override + public Optional convert( + CallExpression call, + ConvertContext context) { + if (!(call.getFunctionDefinition() instanceof UserDefinedFunction)) { + return Optional.empty(); + } + + switch (call.getFunctionDefinition().getKind()) { + case SCALAR: + case TABLE: + List args = call.getChildren().stream().map(context::toRexNode).collect(Collectors.toList()); + return Optional.of(context.getRelBuilder().call( + BridgingSqlFunction.of( + context.getDataTypeFactory(), + context.getTypeFactory(), + SqlKind.OTHER_FUNCTION, + ApiExpressionUtils.getFunctionIdentifier(call), Review comment: It seems wrong to me that we are using ApiUtils in the planner. This should be done earlier when constructing the call. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397290669 ## File path: flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java ## @@ -444,6 +450,31 @@ private void testUserDefinedCatalogFunction(TableEnvironment tableEnv, String cr tableEnv.sqlUpdate("drop table t2"); } + @Test + public void testDataTypeBasedTypeInferenceNotSupported() throws Exception { + thrown.expect(ValidationException.class); + thrown.expectMessage("The new type inference for functions is only supported in the Blink planner."); + + StreamExecutionEnvironment streamExecEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); + StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(streamExecEnvironment); + + tableEnvironment.createTemporarySystemFunction("func", SimpleScalarFunction.class); + Table table = tableEnvironment + .sqlQuery("SELECT func(1)"); + tableEnvironment.toAppendStream(table, Row.class).print(); + + streamExecEnvironment.execute(); + } + + /** Review comment: remove comment This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397238739 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java ## @@ -198,4 +204,48 @@ public static boolean isFunction(Expression expression, BuiltInFunctionDefinitio } return false; } + + /** +* Extracts a {@link FunctionIdentifier} for the given {@link CallExpression}. If the call is an inline funcion +* ({@link CallExpression#getFunctionIdentifier()} returns empty) Review comment: Something is wrong with this JavaDoc. Can you verify all parenthesis? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397281589 ## File path: flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala ## @@ -37,16 +37,29 @@ import _root_.scala.collection.JavaConverters._ class PlannerExpressionConverter private extends ApiExpressionVisitor[PlannerExpression] { override def visit(call: CallExpression): PlannerExpression = { -translateCall(call.getFunctionDefinition, call.getChildren.asScala) +val definition = call.getFunctionDefinition +translateCall( + definition, call.getChildren.asScala, + () => if (definition.getKind == FunctionKind.AGGREGATE || +definition.getKind == FunctionKind.TABLE_AGGREGATE) { Review comment: nit: fix indention This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397274081 ## File path: flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/UserDefinedFunctionConvertRule.java ## @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.expressions.converter; + +import org.apache.flink.table.expressions.ApiExpressionUtils; +import org.apache.flink.table.expressions.CallExpression; +import org.apache.flink.table.functions.UserDefinedFunction; +import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; + +import org.apache.calcite.rex.RexNode; +import org.apache.calcite.sql.SqlKind; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; + +/** + * A call expression converter rule that converts calls to user defined functions. + */ +public class UserDefinedFunctionConvertRule implements CallExpressionConvertRule { + @Override + public Optional convert( + CallExpression call, + ConvertContext context) { + if (!(call.getFunctionDefinition() instanceof UserDefinedFunction)) { + return Optional.empty(); + } + + switch (call.getFunctionDefinition().getKind()) { + case SCALAR: + case TABLE: + List args = call.getChildren().stream().map(context::toRexNode).collect(Collectors.toList()); + return Optional.of(context.getRelBuilder().call( + BridgingSqlFunction.of( + context.getDataTypeFactory(), + context.getTypeFactory(), + SqlKind.OTHER_FUNCTION, + ApiExpressionUtils.getFunctionIdentifier(call), + call.getFunctionDefinition(), + call.getFunctionDefinition().getTypeInference(context.getDataTypeFactory())), Review comment: This can throw errors. We should wrap the exception again like in `FunctionCatalogOperatorTable`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397259142 ## File path: flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/CalculatedTableFactory.java ## @@ -59,82 +59,101 @@ public QueryOperation create(ResolvedExpression callExpr, String[] leftTableFiel return callExpr.accept(calculatedTableCreator); } - private class FunctionTableCallVisitor extends ResolvedExpressionDefaultVisitor> { - - private String[] leftTableFieldNames; + private static class FunctionTableCallVisitor extends ResolvedExpressionDefaultVisitor { + private List leftTableFieldNames; + private static final String ATOMIC_FIELD_NAME = "f0"; public FunctionTableCallVisitor(String[] leftTableFieldNames) { - this.leftTableFieldNames = leftTableFieldNames; + this.leftTableFieldNames = Arrays.asList(leftTableFieldNames); } @Override - public CalculatedQueryOperation visit(CallExpression call) { + public CalculatedQueryOperation visit(CallExpression call) { FunctionDefinition definition = call.getFunctionDefinition(); if (definition.equals(AS)) { return unwrapFromAlias(call); - } else if (definition instanceof TableFunctionDefinition) { - return createFunctionCall( - (TableFunctionDefinition) definition, - Collections.emptyList(), - call.getResolvedChildren()); - } else { - return defaultMethod(call); } + + return createFunctionCall(call, Collections.emptyList(), call.getResolvedChildren()); } - private CalculatedQueryOperation unwrapFromAlias(CallExpression call) { + private CalculatedQueryOperation unwrapFromAlias(CallExpression call) { List children = call.getChildren(); List aliases = children.subList(1, children.size()) .stream() .map(alias -> ExpressionUtils.extractValue(alias, String.class) .orElseThrow(() -> new ValidationException("Unexpected alias: " + alias))) .collect(toList()); - if (!isFunctionOfKind(children.get(0), TABLE)) { + if (!(children.get(0) instanceof CallExpression)) { throw fail(); } CallExpression tableCall = (CallExpression) children.get(0); - TableFunctionDefinition tableFunctionDefinition = - (TableFunctionDefinition) tableCall.getFunctionDefinition(); - return createFunctionCall(tableFunctionDefinition, aliases, tableCall.getResolvedChildren()); + return createFunctionCall(tableCall, aliases, tableCall.getResolvedChildren()); } - private CalculatedQueryOperation createFunctionCall( - TableFunctionDefinition tableFunctionDefinition, + private CalculatedQueryOperation createFunctionCall( + CallExpression callExpression, List aliases, List parameters) { - TypeInformation resultType = tableFunctionDefinition.getResultType(); - int callArity = resultType.getTotalFields(); - int aliasesSize = aliases.size(); + FunctionDefinition functionDefinition = callExpression.getFunctionDefinition(); + FunctionIdentifier functionIdentifier = ApiExpressionUtils.getFunctionIdentifier(callExpression); + final TableSchema tableSchema = adjustNames( + extractSchema(callExpression.getOutputDataType()), + aliases, + functionIdentifier); + + return new CalculatedQueryOperation( + functionDefinition, + functionIdentifier, + parameters, + tableSchema); + } - String[] fieldNames; + private TableSchema extractSchema(DataType resultType) { Review
[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl
twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl URL: https://github.com/apache/flink/pull/11280#discussion_r397295153 ## File path: flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/CorrelateValidationTest.scala ## @@ -68,10 +68,6 @@ class CorrelateValidationTest extends TableTestBase { //= throw exception when the called function is a scalar function util.addFunction("func0", Func0) -// Java Table API call -expectExceptionThrown( Review comment: Why is this not thrown anymore? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services