[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-25 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397920958
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
 ##
 @@ -129,6 +125,10 @@ public FlinkTypeFactory getTypeFactory() {
return typeFactory;
}
 
+   public FunctionIdentifier getIdentifier() {
 
 Review comment:
   nit: return Optional


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-25 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397919433
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingUtils.java
 ##
 @@ -52,8 +52,15 @@
  * Utilities for bridging {@link FunctionDefinition} with Calcite's 
representation of functions.
  */
 final class BridgingUtils {
+   static String createName(FunctionIdentifier identifier, 
FunctionDefinition definition) {
 
 Review comment:
   nit: add `@Nullable` to all arguments in this util


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-25 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397899170
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java
 ##
 @@ -110,9 +114,11 @@ public static BridgingSqlFunction of(
dataTypeFactory,
typeFactory,
kind,
-   identifier,
+   identifier != null ? createName(identifier) : 
createInlineFunctionName(definition),
 
 Review comment:
   can we integrate the case distinction in the methods `createName()`, 
`createSqlIdentifier()`, `createSqlFunctionCategory()` directly? First of all, 
I'm not big fan of inline if/else and more importantly, we need the same logic 
for `BridgingSqlAggFunction` which is why the `BridgingUtils` class exists.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-25 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397888975
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/table/FunctionITCase.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
+
+/**
+ * Tests for user defined functions in the Table API.
+ */
+public class FunctionITCase extends AbstractTestBase {
+
+   @Rule
+   public ExpectedException thrown = ExpectedException.none();
+
+   @Test
+   public void testDataTypeBasedTypeInferenceNotSupported() throws 
Exception {
+   thrown.expect(ValidationException.class);
+   thrown.expectMessage("The new type inference for functions is 
only supported in the Blink planner.");
+
+   StreamExecutionEnvironment streamExecEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(streamExecEnvironment);
+
+   Table table = tableEnvironment
+   .sqlQuery("SELECT * FROM (VALUES (1)) AS TableName(f0)")
+   .select(call(new SimpleScalarFunction(), $("f0")));
+   tableEnvironment.toAppendStream(table, Row.class).print();
+
+   streamExecEnvironment.execute();
+   }
+
+   @Test
+   public void testDataTypeBasedTypeInferenceNotSupportedInLateralJoin() 
throws Exception {
+   thrown.expect(ValidationException.class);
+   thrown.expectMessage("The new type inference for functions is 
only supported in the Blink planner.");
+
+   StreamExecutionEnvironment streamExecEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(streamExecEnvironment);
+
+   Table table = tableEnvironment
+   .sqlQuery("SELECT * FROM (VALUES ('A,B,C')) AS 
TableName(f0)")
+   .joinLateral(call(new SimpleTableFunction(), 
$("f0")).as("a", "b"))
+   .select($("a"), $("b"));
+   tableEnvironment.toAppendStream(table, Row.class).print();
+
+   streamExecEnvironment.execute();
+   }
+
+   /**
+* Scalar function that uses new type inference stack.
+*/
+   public static class SimpleScalarFunction extends ScalarFunction {
+   public long eval(Integer i) {
+   return i;
+   }
+   }
+
+   /**
 
 Review comment:
   I think the function has too much code for just testing an exception. It is 
dead code.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-25 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397888192
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
 ##
 @@ -444,6 +450,31 @@ private void 
testUserDefinedCatalogFunction(TableEnvironment tableEnv, String cr
tableEnv.sqlUpdate("drop table t2");
}
 
+   @Test
+   public void testDataTypeBasedTypeInferenceNotSupported() throws 
Exception {
+   thrown.expect(ValidationException.class);
+   thrown.expectMessage("The new type inference for functions is 
only supported in the Blink planner.");
+
+   StreamExecutionEnvironment streamExecEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(streamExecEnvironment);
+
+   tableEnvironment.createTemporarySystemFunction("func", 
SimpleScalarFunction.class);
+   Table table = tableEnvironment
+   .sqlQuery("SELECT func(1)");
+   tableEnvironment.toAppendStream(table, Row.class).print();
+
+   streamExecEnvironment.execute();
+   }
+
+   /**
 
 Review comment:
   True, but there we really tested different kind of functions. The comment 
has nothing to do with the new type inference. But I don't have a strong 
opinion there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-25 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397888192
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
 ##
 @@ -444,6 +450,31 @@ private void 
testUserDefinedCatalogFunction(TableEnvironment tableEnv, String cr
tableEnv.sqlUpdate("drop table t2");
}
 
+   @Test
+   public void testDataTypeBasedTypeInferenceNotSupported() throws 
Exception {
+   thrown.expect(ValidationException.class);
+   thrown.expectMessage("The new type inference for functions is 
only supported in the Blink planner.");
+
+   StreamExecutionEnvironment streamExecEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(streamExecEnvironment);
+
+   tableEnvironment.createTemporarySystemFunction("func", 
SimpleScalarFunction.class);
+   Table table = tableEnvironment
+   .sqlQuery("SELECT func(1)");
+   tableEnvironment.toAppendStream(table, Row.class).print();
+
+   streamExecEnvironment.execute();
+   }
+
+   /**
 
 Review comment:
   True, but there we really tested different kind of functions. The comment 
has nothing to do with the new type inference. But I don't have a string 
opinion there.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-25 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397884272
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
 ##
 @@ -198,4 +204,48 @@ public static boolean isFunction(Expression expression, 
BuiltInFunctionDefinitio
}
return false;
}
+
+   /**
+* Extracts a {@link FunctionIdentifier} for the given {@link 
CallExpression}. If the call is an inline funcion
+* ({@link CallExpression#getFunctionIdentifier()} returns empty)
+* 
+* it uses {@link BuiltInFunctionDefinition#getName()} ()} for 
built in functions
+* it uses {@link UserDefinedFunction#functionIdentifier()} for 
user defined functions
+* it uses {@link FunctionDefinition#toString()} ()} for any 
other functions
+* 
+*/
+   public static FunctionIdentifier getFunctionIdentifier(CallExpression 
callExpression) {
+   if (callExpression.getFunctionIdentifier().isPresent()) {
+   return callExpression.getFunctionIdentifier().get();
+   } else {
+   return getInlineFunctionIdentifier(callExpression);
+   }
+   }
+
+   private static FunctionIdentifier 
getInlineFunctionIdentifier(CallExpression callExpression) {
+   FunctionDefinition functionDefinition = 
callExpression.getFunctionDefinition();
+   if (functionDefinition instanceof BuiltInFunctionDefinition) {
+   return 
FunctionIdentifier.of(((BuiltInFunctionDefinition) 
functionDefinition).getName());
 
 Review comment:
   After an offline discussion, we concluded that all functions must come from 
the catalog manager and thus have an identifier except for inline functions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397276934
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/UserDefinedFunctionConvertRule.java
 ##
 @@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions.converter;
+
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * A call expression converter rule that converts calls to user defined 
functions.
+ */
+public class UserDefinedFunctionConvertRule implements 
CallExpressionConvertRule {
+   @Override
+   public Optional convert(
+   CallExpression call,
+   ConvertContext context) {
+   if (!(call.getFunctionDefinition() instanceof 
UserDefinedFunction)) {
 
 Review comment:
   Or can this be a `FunctionDefinitionConvertRule`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397251630
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/OperationTreeBuilder.java
 ##
 @@ -471,9 +471,14 @@ public QueryOperation flatMap(Expression tableFunction, 
QueryOperation child) {
throw new ValidationException("Only a table function 
can be used in the flatMap operator.");
}
 
-   TypeInformation resultType = ((TableFunctionDefinition) 
((UnresolvedCallExpression) resolvedTableFunction)
-   .getFunctionDefinition())
-   .getResultType();
+   FunctionDefinition functionDefinition = 
((UnresolvedCallExpression) resolvedTableFunction)
+   .getFunctionDefinition();
+   if (!(functionDefinition instanceof TableFunctionDefinition)) {
 
 Review comment:
   please add a follow-up issue in FLINK-13191


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397278876
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
 ##
 @@ -270,36 +273,69 @@ public RelNode visit(SortQueryOperation sort) {
}
 
@Override
-   public  RelNode visit(CalculatedQueryOperation 
calculatedTable) {
-   DataType resultType = 
fromLegacyInfoToDataType(calculatedTable.getResultType());
-   TableFunction tableFunction = 
calculatedTable.getTableFunction();
-   String[] fieldNames = 
calculatedTable.getTableSchema().getFieldNames();
-
-   TypedFlinkTableFunction function = new 
TypedFlinkTableFunction(
-   tableFunction, fieldNames, resultType);
-
+   public RelNode visit(CalculatedQueryOperation calculatedTable) {
+   FunctionDefinition functionDefinition = 
calculatedTable.getFunctionDefinition();
+   List parameters = 
convertToRexNodes(calculatedTable.getParameters());
FlinkTypeFactory typeFactory = 
relBuilder.getTypeFactory();
+   if (functionDefinition instanceof 
TableFunctionDefinition) {
+   return convertLegacyTableFunction(
+   calculatedTable,
+   (TableFunctionDefinition) 
functionDefinition,
+   parameters,
+   typeFactory);
+   }
 
-   TableSqlFunction sqlFunction = new TableSqlFunction(
-   
FunctionIdentifier.of(tableFunction.functionIdentifier()),
-   tableFunction.toString(),
-   tableFunction,
-   resultType,
+   DataTypeFactory dataTypeFactory = 
relBuilder.getCluster()
+   .getPlanner()
+   .getContext()
+   .unwrap(FlinkContext.class)
+   .getCatalogManager()
+   .getDataTypeFactory();
+   return relBuilder.functionScan(
+   BridgingSqlFunction.of(
+   dataTypeFactory,
typeFactory,
-   function,
-   scala.Option.empty());
+   SqlKind.OTHER_FUNCTION,
+   calculatedTable.getFunctionIdentifier(),
+   calculatedTable.getFunctionDefinition(),
+   
calculatedTable.getFunctionDefinition().getTypeInference(dataTypeFactory)),
 
 Review comment:
   Same comment as in `UserDefinedFunctionConverterRule`. I think we should get 
the inference earlier in the API. Otherwise a wrong implemented UDF fails quite 
late.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397291804
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/table/FunctionITCase.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
+
+/**
+ * Tests for user defined functions in the Table API.
+ */
+public class FunctionITCase extends AbstractTestBase {
+
+   @Rule
+   public ExpectedException thrown = ExpectedException.none();
+
+   @Test
+   public void testDataTypeBasedTypeInferenceNotSupported() throws 
Exception {
+   thrown.expect(ValidationException.class);
+   thrown.expectMessage("The new type inference for functions is 
only supported in the Blink planner.");
+
+   StreamExecutionEnvironment streamExecEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(streamExecEnvironment);
+
+   Table table = tableEnvironment
+   .sqlQuery("SELECT * FROM (VALUES (1)) AS TableName(f0)")
+   .select(call(new SimpleScalarFunction(), $("f0")));
+   tableEnvironment.toAppendStream(table, Row.class).print();
+
+   streamExecEnvironment.execute();
+   }
+
+   @Test
+   public void testDataTypeBasedTypeInferenceNotSupportedInLateralJoin() 
throws Exception {
+   thrown.expect(ValidationException.class);
+   thrown.expectMessage("The new type inference for functions is 
only supported in the Blink planner.");
+
+   StreamExecutionEnvironment streamExecEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(streamExecEnvironment);
+
+   Table table = tableEnvironment
+   .sqlQuery("SELECT * FROM (VALUES ('A,B,C')) AS 
TableName(f0)")
+   .joinLateral(call(new SimpleTableFunction(), 
$("f0")).as("a", "b"))
+   .select($("a"), $("b"));
+   tableEnvironment.toAppendStream(table, Row.class).print();
+
+   streamExecEnvironment.execute();
+   }
+
+   /**
 
 Review comment:
   remove comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397294036
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/table/FunctionITCase.java
 ##
 @@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.stream.table;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.FunctionHint;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.api.java.StreamTableEnvironment;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.TableFunction;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.api.Expressions.call;
+
+/**
+ * Tests for user defined functions in the Table API.
+ */
+public class FunctionITCase extends AbstractTestBase {
+
+   @Rule
+   public ExpectedException thrown = ExpectedException.none();
+
+   @Test
+   public void testDataTypeBasedTypeInferenceNotSupported() throws 
Exception {
+   thrown.expect(ValidationException.class);
+   thrown.expectMessage("The new type inference for functions is 
only supported in the Blink planner.");
+
+   StreamExecutionEnvironment streamExecEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(streamExecEnvironment);
+
+   Table table = tableEnvironment
+   .sqlQuery("SELECT * FROM (VALUES (1)) AS TableName(f0)")
+   .select(call(new SimpleScalarFunction(), $("f0")));
+   tableEnvironment.toAppendStream(table, Row.class).print();
+
+   streamExecEnvironment.execute();
+   }
+
+   @Test
+   public void testDataTypeBasedTypeInferenceNotSupportedInLateralJoin() 
throws Exception {
+   thrown.expect(ValidationException.class);
+   thrown.expectMessage("The new type inference for functions is 
only supported in the Blink planner.");
+
+   StreamExecutionEnvironment streamExecEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(streamExecEnvironment);
+
+   Table table = tableEnvironment
+   .sqlQuery("SELECT * FROM (VALUES ('A,B,C')) AS 
TableName(f0)")
+   .joinLateral(call(new SimpleTableFunction(), 
$("f0")).as("a", "b"))
+   .select($("a"), $("b"));
+   tableEnvironment.toAppendStream(table, Row.class).print();
+
+   streamExecEnvironment.execute();
+   }
+
+   /**
+* Scalar function that uses new type inference stack.
+*/
+   public static class SimpleScalarFunction extends ScalarFunction {
+   public long eval(Integer i) {
+   return i;
+   }
+   }
+
+   /**
 
 Review comment:
   remove comment and simplify code?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397255980
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/CalculatedTableFactory.java
 ##
 @@ -59,82 +59,101 @@ public QueryOperation create(ResolvedExpression callExpr, 
String[] leftTableFiel
return callExpr.accept(calculatedTableCreator);
}
 
-   private class FunctionTableCallVisitor extends 
ResolvedExpressionDefaultVisitor> {
-
-   private String[] leftTableFieldNames;
+   private static class FunctionTableCallVisitor extends 
ResolvedExpressionDefaultVisitor {
+   private List leftTableFieldNames;
+   private static final String ATOMIC_FIELD_NAME = "f0";
 
public FunctionTableCallVisitor(String[] leftTableFieldNames) {
-   this.leftTableFieldNames = leftTableFieldNames;
+   this.leftTableFieldNames = 
Arrays.asList(leftTableFieldNames);
}
 
@Override
-   public CalculatedQueryOperation visit(CallExpression call) {
+   public CalculatedQueryOperation visit(CallExpression call) {
FunctionDefinition definition = 
call.getFunctionDefinition();
if (definition.equals(AS)) {
return unwrapFromAlias(call);
-   } else if (definition instanceof 
TableFunctionDefinition) {
-   return createFunctionCall(
-   (TableFunctionDefinition) definition,
-   Collections.emptyList(),
-   call.getResolvedChildren());
-   } else {
-   return defaultMethod(call);
}
+
+   return createFunctionCall(call, 
Collections.emptyList(), call.getResolvedChildren());
}
 
-   private CalculatedQueryOperation 
unwrapFromAlias(CallExpression call) {
+   private CalculatedQueryOperation unwrapFromAlias(CallExpression 
call) {
List children = call.getChildren();
List aliases = children.subList(1, 
children.size())
.stream()
.map(alias -> 
ExpressionUtils.extractValue(alias, String.class)
.orElseThrow(() -> new 
ValidationException("Unexpected alias: " + alias)))
.collect(toList());
 
-   if (!isFunctionOfKind(children.get(0), TABLE)) {
+   if (!(children.get(0) instanceof CallExpression)) {
throw fail();
}
 
CallExpression tableCall = (CallExpression) 
children.get(0);
-   TableFunctionDefinition tableFunctionDefinition =
-   (TableFunctionDefinition) 
tableCall.getFunctionDefinition();
-   return createFunctionCall(tableFunctionDefinition, 
aliases, tableCall.getResolvedChildren());
+   return createFunctionCall(tableCall, aliases, 
tableCall.getResolvedChildren());
}
 
-   private CalculatedQueryOperation createFunctionCall(
-   TableFunctionDefinition tableFunctionDefinition,
+   private CalculatedQueryOperation createFunctionCall(
+   CallExpression callExpression,
List aliases,
List parameters) {
-   TypeInformation resultType = 
tableFunctionDefinition.getResultType();
 
-   int callArity = resultType.getTotalFields();
-   int aliasesSize = aliases.size();
+   FunctionDefinition functionDefinition = 
callExpression.getFunctionDefinition();
+   FunctionIdentifier functionIdentifier = 
ApiExpressionUtils.getFunctionIdentifier(callExpression);
+   final TableSchema tableSchema = adjustNames(
+   
extractSchema(callExpression.getOutputDataType()),
+   aliases,
+   functionIdentifier);
+
+   return new CalculatedQueryOperation(
+   functionDefinition,
+   functionIdentifier,
+   parameters,
+   tableSchema);
+   }
 
-   String[] fieldNames;
+   private TableSchema extractSchema(DataType resultType) {
+ 

[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397253296
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/CalculatedTableFactory.java
 ##
 @@ -59,82 +59,101 @@ public QueryOperation create(ResolvedExpression callExpr, 
String[] leftTableFiel
return callExpr.accept(calculatedTableCreator);
}
 
-   private class FunctionTableCallVisitor extends 
ResolvedExpressionDefaultVisitor> {
-
-   private String[] leftTableFieldNames;
+   private static class FunctionTableCallVisitor extends 
ResolvedExpressionDefaultVisitor {
+   private List leftTableFieldNames;
+   private static final String ATOMIC_FIELD_NAME = "f0";
 
public FunctionTableCallVisitor(String[] leftTableFieldNames) {
-   this.leftTableFieldNames = leftTableFieldNames;
+   this.leftTableFieldNames = 
Arrays.asList(leftTableFieldNames);
}
 
@Override
-   public CalculatedQueryOperation visit(CallExpression call) {
+   public CalculatedQueryOperation visit(CallExpression call) {
FunctionDefinition definition = 
call.getFunctionDefinition();
if (definition.equals(AS)) {
return unwrapFromAlias(call);
-   } else if (definition instanceof 
TableFunctionDefinition) {
-   return createFunctionCall(
-   (TableFunctionDefinition) definition,
-   Collections.emptyList(),
-   call.getResolvedChildren());
-   } else {
-   return defaultMethod(call);
}
+
+   return createFunctionCall(call, 
Collections.emptyList(), call.getResolvedChildren());
}
 
-   private CalculatedQueryOperation 
unwrapFromAlias(CallExpression call) {
+   private CalculatedQueryOperation unwrapFromAlias(CallExpression 
call) {
List children = call.getChildren();
List aliases = children.subList(1, 
children.size())
.stream()
.map(alias -> 
ExpressionUtils.extractValue(alias, String.class)
.orElseThrow(() -> new 
ValidationException("Unexpected alias: " + alias)))
.collect(toList());
 
-   if (!isFunctionOfKind(children.get(0), TABLE)) {
+   if (!(children.get(0) instanceof CallExpression)) {
 
 Review comment:
   just to make sure: we are failing now in the code gen, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397250681
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CalculatedQueryOperation.java
 ##
 @@ -86,4 +87,5 @@ public String asSummaryString() {
public  U accept(QueryOperationVisitor visitor) {
return visitor.visit(this);
}
+
 
 Review comment:
   nit: unnecessary


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397273112
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/UserDefinedFunctionConvertRule.java
 ##
 @@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions.converter;
+
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * A call expression converter rule that converts calls to user defined 
functions.
+ */
+public class UserDefinedFunctionConvertRule implements 
CallExpressionConvertRule {
+   @Override
+   public Optional convert(
+   CallExpression call,
+   ConvertContext context) {
+   if (!(call.getFunctionDefinition() instanceof 
UserDefinedFunction)) {
 
 Review comment:
   Is it necessary to limit this code to `UserDefinedFunctions`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397250326
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/CalculatedQueryOperation.java
 ##
 @@ -33,34 +34,34 @@
  * Describes a relational operation that was created from applying a {@link 
TableFunction}.
  */
 @Internal
-public class CalculatedQueryOperation implements QueryOperation {
+public class CalculatedQueryOperation implements QueryOperation {
 
-   private final TableFunction tableFunction;
+   private final FunctionDefinition functionDefinition;
+   private final FunctionIdentifier functionIdentifier;
private final List parameters;
-   private final TypeInformation resultType;
private final TableSchema tableSchema;
 
public CalculatedQueryOperation(
-   TableFunction tableFunction,
+   FunctionDefinition functionDefinition,
+   FunctionIdentifier functionIdentifier,
List parameters,
-   TypeInformation resultType,
TableSchema tableSchema) {
-   this.tableFunction = tableFunction;
+   this.functionDefinition = functionDefinition;
+   this.functionIdentifier = functionIdentifier;
this.parameters = parameters;
-   this.resultType = resultType;
this.tableSchema = tableSchema;
}
 
-   public TableFunction getTableFunction() {
-   return tableFunction;
+   public FunctionDefinition getFunctionDefinition() {
+   return functionDefinition;
}
 
-   public List getParameters() {
-   return parameters;
+   public FunctionIdentifier getFunctionIdentifier() {
+   return functionIdentifier;
}
 
-   public TypeInformation getResultType() {
-   return resultType;
+   public List getParameters() {
 
 Review comment:
   can we call it "arguments" everywhere consistently? also in the `call(..., 
params)`


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397280834
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/CorrelateCodeGenerator.scala
 ##
 @@ -65,9 +68,11 @@ object CorrelateCodeGenerator {
 
 // according to the SQL standard, every scalar function should also be a 
table function
 // but we don't allow that for now
-if (!rexCall.getOperator.isInstanceOf[BridgingSqlFunction] &&
-!rexCall.getOperator.isInstanceOf[TableSqlFunction]) {
-  throw new ValidationException("Currently, only table functions can emit 
rows.")
+rexCall.getOperator match {
+  case func: BridgingSqlFunction if func.getDefinition.getKind == 
FunctionKind.TABLE => //ok
 
 Review comment:
   nit: whitespace
   could be moved into a hotfix commit and merged immediately


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397236012
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Expressions.java
 ##
 @@ -515,6 +516,14 @@ public static ApiExpression call(String path, Object... 
params) {

Arrays.stream(params).map(ApiExpressionUtils::objectToExpression).toArray(Expression[]::new)));
}
 
+   /**
+* A call to an unregistered, inline function. For functions that have 
been registered before and
 
 Review comment:
   very nit: new line between the two sentences


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397266524
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
 ##
 @@ -157,20 +161,79 @@
DataTypes.BOOLEAN()
)),
 
-   TestSpec.test("Lookup system function call")
+   TestSpec.test("Lookup legacy scalar function call")
.inputSchemas(
TableSchema.builder()
.field("f0", DataTypes.INT())
.build()
)
-   .lookupFunction("func", new 
ScalarFunctionDefinition("func", new ScalarFunc()))
+   .lookupFunction("func", new 
ScalarFunctionDefinition("func", new LegacyScalarFunc()))
.select(call("func", 1, $("f0")))
.equalTo(new CallExpression(
FunctionIdentifier.of("func"),
-   new ScalarFunctionDefinition("func", 
new ScalarFunc()),
+   new ScalarFunctionDefinition("func", 
new LegacyScalarFunc()),
Arrays.asList(valueLiteral(1), new 
FieldReferenceExpression("f0", DataTypes.INT(), 0, 0)),
DataTypes.INT().bridgedTo(Integer.class)
-   )));
+   )),
+
+   TestSpec.test("Lookup system function call")
+   .inputSchemas(
+   TableSchema.builder()
+   .field("f0", DataTypes.INT())
+   .build()
+   )
+   .lookupFunction("func", new ScalarFunc())
+   .select(call("func", 1, $("f0")))
+   .equalTo(new CallExpression(
+   FunctionIdentifier.of("func"),
+   new ScalarFunc(),
+   Arrays.asList(valueLiteral(1), new 
FieldReferenceExpression("f0", DataTypes.INT(), 0, 0)),
+   
DataTypes.INT().notNull().bridgedTo(int.class)
+   )),
+
+   TestSpec.test("Lookup catalog function call")
+   .inputSchemas(
+   TableSchema.builder()
+   .field("f0", DataTypes.INT())
+   .build()
+   )
+   .lookupFunction(ObjectIdentifier.of("cat", 
"db", "func"), new ScalarFunc())
+   .select(call("cat.db.func", 1, $("f0")))
+   .equalTo(new CallExpression(
 
 Review comment:
   nit: a new line for every `new Call`? like below?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397249046
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
 ##
 @@ -198,4 +204,48 @@ public static boolean isFunction(Expression expression, 
BuiltInFunctionDefinitio
}
return false;
}
+
+   /**
+* Extracts a {@link FunctionIdentifier} for the given {@link 
CallExpression}. If the call is an inline funcion
+* ({@link CallExpression#getFunctionIdentifier()} returns empty)
+* 
+* it uses {@link BuiltInFunctionDefinition#getName()} ()} for 
built in functions
+* it uses {@link UserDefinedFunction#functionIdentifier()} for 
user defined functions
+* it uses {@link FunctionDefinition#toString()} ()} for any 
other functions
+* 
+*/
+   public static FunctionIdentifier getFunctionIdentifier(CallExpression 
callExpression) {
+   if (callExpression.getFunctionIdentifier().isPresent()) {
+   return callExpression.getFunctionIdentifier().get();
+   } else {
+   return getInlineFunctionIdentifier(callExpression);
+   }
+   }
+
+   private static FunctionIdentifier 
getInlineFunctionIdentifier(CallExpression callExpression) {
+   FunctionDefinition functionDefinition = 
callExpression.getFunctionDefinition();
+   if (functionDefinition instanceof BuiltInFunctionDefinition) {
+   return 
FunctionIdentifier.of(((BuiltInFunctionDefinition) 
functionDefinition).getName());
 
 Review comment:
   should we merge this logic into `BuiltInFunctionDefinition`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397242372
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
 ##
 @@ -198,4 +204,48 @@ public static boolean isFunction(Expression expression, 
BuiltInFunctionDefinitio
}
return false;
}
+
+   /**
+* Extracts a {@link FunctionIdentifier} for the given {@link 
CallExpression}. If the call is an inline funcion
+* ({@link CallExpression#getFunctionIdentifier()} returns empty)
+* 
+* it uses {@link BuiltInFunctionDefinition#getName()} ()} for 
built in functions
+* it uses {@link UserDefinedFunction#functionIdentifier()} for 
user defined functions
+* it uses {@link FunctionDefinition#toString()} ()} for any 
other functions
 
 Review comment:
   This case should not be supported. Either we are dealing with built-ins or 
user-defined functions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397271315
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java
 ##
 @@ -71,18 +72,26 @@
 public class ExpressionConverter implements ExpressionVisitor {
 
private static final List 
FUNCTION_CONVERT_CHAIN = Arrays.asList(
-   new ScalarFunctionConvertRule(),
+   new LegacyScalarFunctionConvertRule(),
+   new UserDefinedFunctionConvertRule(),
new OverConvertRule(),
new DirectConvertRule(),
new CustomizedConvertRule()
);
 
private final RelBuilder relBuilder;
private final FlinkTypeFactory typeFactory;
+   private final DataTypeFactory dataTypeFactory;
 
public ExpressionConverter(RelBuilder relBuilder) {
this.relBuilder = relBuilder;
this.typeFactory = (FlinkTypeFactory) 
relBuilder.getRexBuilder().getTypeFactory();
+   this.dataTypeFactory = relBuilder.getCluster()
 
 Review comment:
   Use `org.apache.flink.table.planner.utils.ShortcutUtils` for readability.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397267739
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/expressions/resolver/ExpressionResolverTest.java
 ##
 @@ -186,13 +249,37 @@ public void testResolvingExpressions() {
}
 
/**
-* Test scalar function that uses legacy type inference logic.
+* Test scalar function.
 */
+   @FunctionHint(
+   input = @DataTypeHint(inputGroup = InputGroup.ANY),
+   isVarArgs = true,
+   output = @DataTypeHint(value = "INTEGER NOT NULL",
+   bridgedTo = int.class))
 
 Review comment:
   nit: fix formatting


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397276251
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/UserDefinedFunctionConvertRule.java
 ##
 @@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions.converter;
+
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * A call expression converter rule that converts calls to user defined 
functions.
+ */
+public class UserDefinedFunctionConvertRule implements 
CallExpressionConvertRule {
+   @Override
+   public Optional convert(
+   CallExpression call,
+   ConvertContext context) {
+   if (!(call.getFunctionDefinition() instanceof 
UserDefinedFunction)) {
+   return Optional.empty();
+   }
+
+   switch (call.getFunctionDefinition().getKind()) {
+   case SCALAR:
+   case TABLE:
+   List args = 
call.getChildren().stream().map(context::toRexNode).collect(Collectors.toList());
+   return Optional.of(context.getRelBuilder().call(
+   BridgingSqlFunction.of(
+   context.getDataTypeFactory(),
+   context.getTypeFactory(),
+   SqlKind.OTHER_FUNCTION,
+   
ApiExpressionUtils.getFunctionIdentifier(call),
 
 Review comment:
   It seems wrong to me that we are using ApiUtils in the planner. This should 
be done earlier when constructing the call.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397290669
 
 

 ##
 File path: 
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/runtime/stream/sql/FunctionITCase.java
 ##
 @@ -444,6 +450,31 @@ private void 
testUserDefinedCatalogFunction(TableEnvironment tableEnv, String cr
tableEnv.sqlUpdate("drop table t2");
}
 
+   @Test
+   public void testDataTypeBasedTypeInferenceNotSupported() throws 
Exception {
+   thrown.expect(ValidationException.class);
+   thrown.expectMessage("The new type inference for functions is 
only supported in the Blink planner.");
+
+   StreamExecutionEnvironment streamExecEnvironment = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   StreamTableEnvironment tableEnvironment = 
StreamTableEnvironment.create(streamExecEnvironment);
+
+   tableEnvironment.createTemporarySystemFunction("func", 
SimpleScalarFunction.class);
+   Table table = tableEnvironment
+   .sqlQuery("SELECT func(1)");
+   tableEnvironment.toAppendStream(table, Row.class).print();
+
+   streamExecEnvironment.execute();
+   }
+
+   /**
 
 Review comment:
   remove comment


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397238739
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/ApiExpressionUtils.java
 ##
 @@ -198,4 +204,48 @@ public static boolean isFunction(Expression expression, 
BuiltInFunctionDefinitio
}
return false;
}
+
+   /**
+* Extracts a {@link FunctionIdentifier} for the given {@link 
CallExpression}. If the call is an inline funcion
+* ({@link CallExpression#getFunctionIdentifier()} returns empty)
 
 Review comment:
   Something is wrong with this JavaDoc. Can you verify all parenthesis?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397281589
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/expressions/PlannerExpressionConverter.scala
 ##
 @@ -37,16 +37,29 @@ import _root_.scala.collection.JavaConverters._
 class PlannerExpressionConverter private extends 
ApiExpressionVisitor[PlannerExpression] {
 
   override def visit(call: CallExpression): PlannerExpression = {
-translateCall(call.getFunctionDefinition, call.getChildren.asScala)
+val definition = call.getFunctionDefinition
+translateCall(
+  definition, call.getChildren.asScala,
+  () => if (definition.getKind == FunctionKind.AGGREGATE ||
+definition.getKind == FunctionKind.TABLE_AGGREGATE) {
 
 Review comment:
   nit: fix indention


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397274081
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/UserDefinedFunctionConvertRule.java
 ##
 @@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions.converter;
+
+import org.apache.flink.table.expressions.ApiExpressionUtils;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.functions.UserDefinedFunction;
+import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlKind;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * A call expression converter rule that converts calls to user defined 
functions.
+ */
+public class UserDefinedFunctionConvertRule implements 
CallExpressionConvertRule {
+   @Override
+   public Optional convert(
+   CallExpression call,
+   ConvertContext context) {
+   if (!(call.getFunctionDefinition() instanceof 
UserDefinedFunction)) {
+   return Optional.empty();
+   }
+
+   switch (call.getFunctionDefinition().getKind()) {
+   case SCALAR:
+   case TABLE:
+   List args = 
call.getChildren().stream().map(context::toRexNode).collect(Collectors.toList());
+   return Optional.of(context.getRelBuilder().call(
+   BridgingSqlFunction.of(
+   context.getDataTypeFactory(),
+   context.getTypeFactory(),
+   SqlKind.OTHER_FUNCTION,
+   
ApiExpressionUtils.getFunctionIdentifier(call),
+   call.getFunctionDefinition(),
+   
call.getFunctionDefinition().getTypeInference(context.getDataTypeFactory())),
 
 Review comment:
   This can throw errors. We should wrap the exception again like in 
`FunctionCatalogOperatorTable`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397259142
 
 

 ##
 File path: 
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/utils/factories/CalculatedTableFactory.java
 ##
 @@ -59,82 +59,101 @@ public QueryOperation create(ResolvedExpression callExpr, 
String[] leftTableFiel
return callExpr.accept(calculatedTableCreator);
}
 
-   private class FunctionTableCallVisitor extends 
ResolvedExpressionDefaultVisitor> {
-
-   private String[] leftTableFieldNames;
+   private static class FunctionTableCallVisitor extends 
ResolvedExpressionDefaultVisitor {
+   private List leftTableFieldNames;
+   private static final String ATOMIC_FIELD_NAME = "f0";
 
public FunctionTableCallVisitor(String[] leftTableFieldNames) {
-   this.leftTableFieldNames = leftTableFieldNames;
+   this.leftTableFieldNames = 
Arrays.asList(leftTableFieldNames);
}
 
@Override
-   public CalculatedQueryOperation visit(CallExpression call) {
+   public CalculatedQueryOperation visit(CallExpression call) {
FunctionDefinition definition = 
call.getFunctionDefinition();
if (definition.equals(AS)) {
return unwrapFromAlias(call);
-   } else if (definition instanceof 
TableFunctionDefinition) {
-   return createFunctionCall(
-   (TableFunctionDefinition) definition,
-   Collections.emptyList(),
-   call.getResolvedChildren());
-   } else {
-   return defaultMethod(call);
}
+
+   return createFunctionCall(call, 
Collections.emptyList(), call.getResolvedChildren());
}
 
-   private CalculatedQueryOperation 
unwrapFromAlias(CallExpression call) {
+   private CalculatedQueryOperation unwrapFromAlias(CallExpression 
call) {
List children = call.getChildren();
List aliases = children.subList(1, 
children.size())
.stream()
.map(alias -> 
ExpressionUtils.extractValue(alias, String.class)
.orElseThrow(() -> new 
ValidationException("Unexpected alias: " + alias)))
.collect(toList());
 
-   if (!isFunctionOfKind(children.get(0), TABLE)) {
+   if (!(children.get(0) instanceof CallExpression)) {
throw fail();
}
 
CallExpression tableCall = (CallExpression) 
children.get(0);
-   TableFunctionDefinition tableFunctionDefinition =
-   (TableFunctionDefinition) 
tableCall.getFunctionDefinition();
-   return createFunctionCall(tableFunctionDefinition, 
aliases, tableCall.getResolvedChildren());
+   return createFunctionCall(tableCall, aliases, 
tableCall.getResolvedChildren());
}
 
-   private CalculatedQueryOperation createFunctionCall(
-   TableFunctionDefinition tableFunctionDefinition,
+   private CalculatedQueryOperation createFunctionCall(
+   CallExpression callExpression,
List aliases,
List parameters) {
-   TypeInformation resultType = 
tableFunctionDefinition.getResultType();
 
-   int callArity = resultType.getTotalFields();
-   int aliasesSize = aliases.size();
+   FunctionDefinition functionDefinition = 
callExpression.getFunctionDefinition();
+   FunctionIdentifier functionIdentifier = 
ApiExpressionUtils.getFunctionIdentifier(callExpression);
+   final TableSchema tableSchema = adjustNames(
+   
extractSchema(callExpression.getOutputDataType()),
+   aliases,
+   functionIdentifier);
+
+   return new CalculatedQueryOperation(
+   functionDefinition,
+   functionIdentifier,
+   parameters,
+   tableSchema);
+   }
 
-   String[] fieldNames;
+   private TableSchema extractSchema(DataType resultType) {
 
 Review 

[GitHub] [flink] twalthr commented on a change in pull request #11280: [FLINK-16377][table] Support inline user defined functions in expression dsl

2020-03-24 Thread GitBox
twalthr commented on a change in pull request #11280: [FLINK-16377][table] 
Support inline user defined functions in expression dsl
URL: https://github.com/apache/flink/pull/11280#discussion_r397295153
 
 

 ##
 File path: 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/table/validation/CorrelateValidationTest.scala
 ##
 @@ -68,10 +68,6 @@ class CorrelateValidationTest extends TableTestBase {
 //= throw exception when the called function is a scalar function 

 util.addFunction("func0", Func0)
 
-// Java Table API call
-expectExceptionThrown(
 
 Review comment:
   Why is this not thrown anymore?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services