[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r114236223 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DeferredTypeFlinkTableFunction.scala --- @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.plan.schema + +import java.lang.reflect.Method +import java.util + +import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils +import org.apache.flink.table.functions.{TableFunction => FlinkUDTF} + +import scala.collection.JavaConversions._ + +/** + * A Deferred Type is a Table Function which the result type hasn't been determined yet. + * It will determine the result type after the arguments are passed. + * + * @param tableFunction The Table Function instance + * @param evalMethod The eval() method of the [[tableFunction]] + * @param implicitResultType Implicit result type. + */ +class DeferredTypeFlinkTableFunction( +val tableFunction: FlinkUDTF[_], +val evalMethod: Method, +val implicitResultType: TypeInformation[_]) + extends FlinkTableFunction(tableFunction, evalMethod) { + + val paramTypeInfos = evalMethod.getParameterTypes.toList + + override def getResultType(arguments: util.List[AnyRef]): TypeInformation[_] = { +determineResultType(arguments) + } + + override def getRowType( + typeFactory: RelDataTypeFactory, + arguments: util.List[AnyRef]): RelDataType = { +val resultType = determineResultType(arguments) +val (fieldNames, fieldIndexes, _) = UserDefinedFunctionUtils.getFieldInfo(resultType) +UserDefinedFunctionUtils.buildRelDataType(typeFactory, resultType, fieldNames, fieldIndexes) + } + + private def determineResultType(arguments: util.List[AnyRef]): TypeInformation[_] = { +val resultType = tableFunction.getResultType(arguments, paramTypeInfos) --- End diff -- I've updated the patch. I tried to use a `UserDefinedFunctionUtils.getEvalMethod()` to find the `eval()' method when users call the `apply()`. They would be the consistent behaviors now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r113861371 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/dataset/DataSetUserDefinedFunctionITCase.scala --- @@ -117,6 +117,187 @@ class DataSetUserDefinedFunctionITCase( } @Test + def testDynamicSchema(): Unit = { --- End diff -- Yes, I agree with you. I've updated the consistently patch. I will refine these tests very soon today. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3623: [FLINK-6196] [table] Support dynamic schema in Table Func...
Github user clarkyzl commented on the issue: https://github.com/apache/flink/pull/3623 Hi @fhueske . I've updated the pull request. Could you please also leave some comments on the latest commit? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r112392884 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -382,7 +381,10 @@ object UserDefinedFunctionUtils { implicitResultType: TypeInformation[_], params: Expression*): TableFunctionCall = { val arguments = transformLiteralExpressions(params: _*) -val userDefinedResultType = tableFunction.getResultType(arguments) +val typeInformations = params.map { param => + if (param.valid) param.resultType else null --- End diff -- I think if the parameter is not resolved. It is not validated. ``` val result = in .join(funcDyn('c, 1) as 'name) .select('c, 'name) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r112156728 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/schema/DeferredTypeFlinkTableFunction.scala --- @@ -40,6 +43,10 @@ class DeferredTypeFlinkTableFunction( val implicitResultType: TypeInformation[_]) extends FlinkTableFunction(tableFunction, evalMethod) { + val paramTypeInfos = evalMethod.getParameterTypes.map { paramType => +TypeExtractor.getForClass(paramType) --- End diff -- I think signature types of the `eval()` method will be better. First, It will make them no differences in both situations. I think I'd like to try to do convert the first case to the signatures of the `eval()` method. It will make it easier to identify which method is called. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r112141809 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -382,7 +381,10 @@ object UserDefinedFunctionUtils { implicitResultType: TypeInformation[_], params: Expression*): TableFunctionCall = { val arguments = transformLiteralExpressions(params: _*) -val userDefinedResultType = tableFunction.getResultType(arguments) +val typeInformations = params.map { param => + if (param.valid) param.resultType else null --- End diff -- Actually. The `param.valid` here is to skip the `UnresolvedFieldReference`. For example. ``` val result = in .join(funcDyn('c, 1) as 'name) .select('c, 'name) ``` will throw a `UnresolvedFieldReference` ``` org.apache.flink.table.api.UnresolvedException: Calling resultType on class org.apache.flink.table.expressions.UnresolvedFieldReference. at org.apache.flink.table.expressions.UnresolvedFieldReference.resultType(fieldExpression.scala:45) at org.apache.flink.table.functions.utils.UserDefinedFunctionUtils$$anonfun$10.apply(UserDefinedFunctionUtils.scala:385) at org.apache.flink.table.functions.utils.UserDefinedFunctionUtils$$anonfun$10.apply(UserDefinedFunctionUtils.scala:384) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:34) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.flink.table.functions.utils.UserDefinedFunctionUtils$.buildTableFunctionCall(UserDefinedFunctionUtils.scala:384) at org.apache.flink.table.functions.TableFunction.apply(TableFunction.scala:91) at org.apache.flink.table.runtime.dataset.DataSetUserDefinedFunctionITCase.testDynamicSchema(DataSetUserDefinedFunctionITCase.scala:127) ``` Because `'c` is an `Expression` which has't been resolved. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r112126252 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala --- @@ -135,8 +135,11 @@ abstract class TableFunction[T] extends UserDefinedFunction { * * @param arguments arguments of a function call (only literal arguments * are passed, nulls for non-literal ones) +* @param typeInfos The type information of the parameters. only valid argument types --- End diff -- Yes, I think we should resolve the expressions one more time to get all the TypeInformations. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r110577213 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala --- @@ -135,8 +135,11 @@ abstract class TableFunction[T] extends UserDefinedFunction { * * @param arguments arguments of a function call (only literal arguments * are passed, nulls for non-literal ones) +* @param typeInfos The type information of the parameters. only valid argument types +* are passed, nulls for non-valid ones. * @return [[TypeInformation]] of result type or null if Flink should determine the type */ - def getResultType(arguments: java.util.List[AnyRef]): TypeInformation[T] = null + def getResultType(arguments: java.util.List[AnyRef], +typeInfos: java.util.List[TypeInformation[_]]): TypeInformation[T] = null --- End diff -- Yes. I agree with you. I think the List[Class[_]] will make users easier to parse. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r110317256 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/TableFunction.scala --- @@ -131,8 +129,14 @@ abstract class TableFunction[T] extends UserDefinedFunction { * method. Flink's type extraction facilities can handle basic types or * simple POJOs but might be wrong for more complex, custom, or composite types. * +* The input arguments are the input arguments which are passed to the eval() method. +* Only the literal arguments (constant values) are passed to the [[getResultType()]] method. +* If non-literal arguments appear, it will pass nulls instead. +* +* @param arguments arguments of a function call (only literal arguments +* are passed, nulls for non-literal ones) * @return [[TypeInformation]] of result type or null if Flink should determine the type */ - def getResultType: TypeInformation[T] = null + def getResultType(arguments: java.util.List[AnyRef]): TypeInformation[T] = null --- End diff -- I have updated the pull request and added a `List[TypeInformation[_]` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r110141028 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -358,4 +366,120 @@ object UserDefinedFunctionUtils { InstantiationUtil .deserializeObject[UserDefinedFunction](byteData, Thread.currentThread.getContextClassLoader) } + + /** +* Build a TableFunctionCall, a name and a sequence of params will determine a unique +* [[TableFunctionCall]] +* +* @param name function name +* @param implicitResultType If no result type returned, it will use this type. +* @param params The input expressions +* @return A unique [[TableFunctionCall]] +*/ + private[table] def buildTableFunctionCall( + name: String, + tableFunction: TableFunction[_], + implicitResultType: TypeInformation[_], + params: Expression*): TableFunctionCall = { +val arguments = transformLiteralExpressions(params: _*) +val userDefinedResultType = tableFunction.getResultType(arguments) +val resultType = { + if (userDefinedResultType != null) userDefinedResultType + else implicitResultType +} +TableFunctionCall(name, tableFunction, params, resultType) + } + + /** +* Transform the expressions to Objects +* Only literal expressions will be transformed, non-literal expressions will be +* translated to nulls. +* +* @param params actual parameters of the function +* @return A java List of the Objects +*/ + private[table] def transformLiteralExpressions(params: Expression*): java.util.List[AnyRef] = { +params.map { + case exp: Literal => +exp.value.asInstanceOf[AnyRef] + case _ => +null +} + } + + /** +* Transform the rex nodes to Objects +* Only literal rex nodes will be transformed, non-literal rex nodes will be +* translated to nulls. +* +* @param rexNodes actual parameters of the function +* @return A java List of the Objects +*/ + private[table] def transformRexNodes( + rexNodes: java.util.List[RexNode]): java.util.List[AnyRef] = { +rexNodes.map { + case rexNode: RexLiteral => +val value = rexNode.getValue2 +rexNode.getType.getSqlTypeName match { + case SqlTypeName.INTEGER => +value.asInstanceOf[Long].toInt.asInstanceOf[AnyRef] + case SqlTypeName.SMALLINT => +value.asInstanceOf[Long].toShort.asInstanceOf[AnyRef] + case SqlTypeName.TINYINT => +value.asInstanceOf[Long].toByte.asInstanceOf[AnyRef] + case SqlTypeName.FLOAT => +value.asInstanceOf[Double].toFloat.asInstanceOf[AnyRef] + case SqlTypeName.REAL => +value.asInstanceOf[Double].toFloat.asInstanceOf[AnyRef] + case _ => --- End diff -- `Timestamp` works fine. When I call `getValue2()` it will generate a `Long` on this. I will add a test on this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3623#discussion_r110086591 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/TableSqlFunction.scala --- @@ -53,22 +54,9 @@ class TableSqlFunction( paramTypes, functionImpl) { - /** -* Get the user-defined table function. -*/ - def getTableFunction = udtf - - /** -* Get the type information of the table returned by the table function. -*/ - def getRowTypeInfo = rowTypeInfo - - /** -* Get additional mapping information if the returned table type is a POJO -* (POJO types have no deterministic field order). -*/ - def getPojoFieldMapping = functionImpl.fieldIndexes --- End diff -- Yes. POJOs are still supported. The pojo field mappings are re-calculated by the dynamic schema. You can see in `DataSetCorrelate` and `DataStreamCorrelate`. Also all the tests of POJOs still pass. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3623: [FLINK-6196] [table] Support dynamic schema in Tab...
GitHub user clarkyzl opened a pull request: https://github.com/apache/flink/pull/3623 [FLINK-6196] [table] Support dynamic schema in Table Function Type: Improvement Priority: Major Components: table, udtf Problem Definition: Support dynamic schema in Table Function Design: 1. Modified the getResult() interfaces of an UDTF. Suport java sytle arguments of a list. only literals will be passed to the UDTf. 1. Define the TableFunction and TableFunctionCall clearly. A TableFunction is an object that the ResultType and parameters are not determined. A TableFunctionCall is an object that thre ResultType and paramenters are determined. 1. Implement the TableAPI, Expression and SQL style call stack of the getResultType. Impact Analysis: UDTF, an interface has changed. Test: All tests done. You can merge this pull request into a Git repository by running: $ git pull https://github.com/clarkyzl/flink flink-6196 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3623.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3623 commit 9be655dec415a720051b85ae78e7f052a6e22f81 Author: Zhuoluo Yang Date: 2017-03-14T08:44:02Z [FLINK-6039] [core] Row of TableFunction should support flexible number of fields commit a87721b18a6972cd0b80384a0812df27b5253f9c Author: Zhuoluo Yang Date: 2017-03-16T06:34:39Z [FLINK-6039] [core] Revert some modifications commit 140e6c76b51a80b35c1071590b19aefbf65990eb Author: Zhuoluo Yang Date: 2017-03-17T16:56:13Z [FLINK-6039] [core] Support dynamic schema of TableFunction commit 6ac05e217acb6c0c73a1ea978891ff88d78bc0a8 Author: Zhuoluo Yang Date: 2017-03-22T03:49:49Z [FLINK-6039] [core] Fix the dynamic schema of Table API commit 1c88362c714c7f4e6f0a56397e9cf2801abfcf0a Author: Zhuoluo Yang Date: 2017-03-22T08:29:31Z [FLINK-6039] [core] Fix build exception commit eba7ba8cbd7c576510e649f1b4ecad2a55300f82 Author: Zhuoluo Yang Date: 2017-03-22T09:53:28Z [FLINK-6039] [core] fix comments and test failure commit 768d03d09267ce46b789cf58add9f49455d01585 Author: Zhuoluo Yang Date: 2017-03-23T06:00:13Z Add a test case for the test commit 6fe805f3b1f83c66c02dca8c65892b7b8d48f3e4 Author: Zhuoluo Yang Date: 2017-03-27T14:25:36Z [FLINK-6196] [table] Support dynamic schema in Table Function --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3553: [FLINK-6068] [table] Support If() in built in func...
GitHub user clarkyzl opened a pull request: https://github.com/apache/flink/pull/3553 [FLINK-6068] [table] Support If() in built in function of TableAPI Type: Improvement Priority: Major Components: table, udf Problem Definition: We didn't register the if() in the function category. We didn't support syntax 'if(a, b, c)'. Design: 1. Register If in function category. 1. Add some tests. Impact Analysis: A newly registered function, without more impacts on others. Test: `mvn clean verify` is done You can merge this pull request into a Git repository by running: $ git pull https://github.com/clarkyzl/flink flink-6068 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3553.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3553 commit fcc82d7ff106af7315d8e0f57993212e8221e9e9 Author: Zhuoluo Yang Date: 2017-03-16T08:07:33Z [FLINK-6068] [table] Support If() in built in function of TableAPI --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3530: [FLINK-6040] [table] DataStreamUserDefinedFunction...
GitHub user clarkyzl opened a pull request: https://github.com/apache/flink/pull/3530 [FLINK-6040] [table] DataStreamUserDefinedFunctionITCase occasionally fails Type: Bug Priority: Major Components: table, test Problem Definition: DataStreamUserDefinedFunctionITCase occasionally fails Design: Three test cases in `DataStreamUserDefinedFunctionITCase` forgot to call the method `StreamITCase.clear method`. This will cause the case occasionally fails. Because the result of one case may affect another sometimes. The patch the is simply adding the missing methods. Impact Analysis: Only the tests. No impacts on the core code. Test: Tested `DataStreamUserDefinedFunctionITCase` locally. You can merge this pull request into a Git repository by running: $ git pull https://github.com/clarkyzl/flink flink-6040 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3530.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3530 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3529: [FLINK-6039] [core] Row of TableFunction should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3529#discussion_r105861507 --- Diff: flink-core/src/main/java/org/apache/flink/types/Row.java --- @@ -66,10 +66,11 @@ public int getArity() { * Gets the field at the specified position. * @param pos The position of the field, 0-based. * @return The field at the specified position. -* @throws IndexOutOfBoundsException Thrown, if the position is negative, or equal to, or larger than the number of fields. +* Return null if the position is equal to, or larger than the number of fields. +* @throws IndexOutOfBoundsException Thrown, if the position is negative. */ public Object getField(int pos) { - return fields[pos]; + return pos >= fields.length ? null : fields[pos]; --- End diff -- Hi @fhueske . Thank you for your comments. The overhead of `if`s are additionally caused here. It may not be an efficient way. IMHO, The design of `TableFunction` support arbitrary types of output Row. However, too many kinds of `TableFunction`s may also confuse users. I will discover whether there is a concise and efficient way. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3529: [FLINK-6039] [core] Row of TableFunction should support f...
Github user clarkyzl commented on the issue: https://github.com/apache/flink/pull/3529 I didn't change the behavior of `org.apache.flink.types.Record`. I think we can open another issue if necessary. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3529: [FLINK-6039] [core] Row of TableFunction should su...
GitHub user clarkyzl opened a pull request: https://github.com/apache/flink/pull/3529 [FLINK-6039] [core] Row of TableFunction should support flexible number of fields Type: Improvement Priority: Major Components: core, table, udtf Problem Definition: Row of TableFunction should support flexible number of fields. The number of fields should not be fixed. The framework should allow the scenario that the field numbers of the `Row`s are different from the `TypeInformation` number of the `ResultType`s. Design: It would throw a IndexOutOfBoundException when accessing the position larger than or equal to the number of fields in a Row, before the patch. It will return null after the patch. Impact Analysis: The behavior of the Row#getField has changed. Test: `mvn clean verify` is done. The case `TableFunc4` and `TableFunc6` would pass no matter this patch is patched or not. They are here to make sure the case works. The case `TableFunc5` would success after the patch is patched. You can merge this pull request into a Git repository by running: $ git pull https://github.com/clarkyzl/flink flink-6039 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3529.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3529 commit 9814ec01ed0f8031f055bea63e01d713f56945a2 Author: Zhuoluo Yang Date: 2017-03-14T08:44:02Z [FLINK-6039] [core] Row of TableFunction should support flexible number of fields --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3389: [FLINK-5881] [table] ScalarFunction(UDF) should support v...
Github user clarkyzl commented on the issue: https://github.com/apache/flink/pull/3389 Hi @twalthr . I've updated the patch as your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3456: [FLINK-5832] [table] Support for simple hive UDF
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3456#discussion_r105095839 --- Diff: flink-libraries/flink-table/pom.xml --- @@ -92,6 +92,11 @@ under the License. + + org.apache.hive --- End diff -- When I rebased the patch onto #3389 , the issue disappears. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3495: [FLINK-5781] Generation HTML from ConfigOption
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3495#discussion_r105077753 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/ConfigOption.java --- @@ -137,6 +153,26 @@ public boolean hasDeprecatedKeys() { return deprecatedKeys == EMPTY ? Collections.emptyList() : Arrays.asList(deprecatedKeys); } + public String shortDescription() { + return shortDescription; + } + + public String description() { + return description; + } + + String toHTMLString(boolean includeShort) { + String stringBuilder = "" + --- End diff -- I think we should use `java.lang.StringBuilder`. ```java StringBuilder sb = new StringBuilder(); sb.append("") .append("").append(key).append(""); ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3497: [FLINK-6002] Documentation: 'MacOS X' section in Quicksta...
Github user clarkyzl commented on the issue: https://github.com/apache/flink/pull/3497 +1 LGTM. Thanks @phoenixjiangnan --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3490: Update Docker to use the latest 1.1.x version of F...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3490#discussion_r104848459 --- Diff: flink-contrib/docker-flink/Dockerfile --- @@ -22,7 +22,7 @@ FROM java:8-jre-alpine RUN apk add --no-cache bash snappy # Configure Flink version -ARG FLINK_VERSION=1.1.3 +ARG FLINK_VERSION=1.1.4 --- End diff -- Could you please explain why we have to do this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3455: [FLINK-5134] [runtime] [FLIP-6] Aggregate Resource...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3455#discussion_r104829653 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java --- @@ -208,4 +209,36 @@ public Integer map(Integer value) throws Exception { assertFalse(printConfig.isChainStart()); assertTrue(printConfig.isChainEnd()); } + +// /** --- End diff -- According to Apache's rule. Don't comment any code. If you would like to delete them, please delete them. Git logs and pull requests will show the changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3407: [FLINK-5882] [table] TableFunction (UDTF) should support ...
Github user clarkyzl commented on the issue: https://github.com/apache/flink/pull/3407 Hi @KurtYoung . Thanks for your reviewing. I've pushed another commit. This patch is based on https://github.com/apache/flink/pull/3389. If we can get https://github.com/apache/flink/pull/3389 merged first, this patch will look simpler. @twalthr , could you please take a look at https://github.com/apache/flink/pull/3389 ? Thank you all so much. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3456: [FLINK-5832] [table] Support for simple hive UDF
Github user clarkyzl commented on the issue: https://github.com/apache/flink/pull/3456 Hi @twalthr , @KurtYoung . This feature is based on https://github.com/apache/flink/pull/3389 . If we can get https://github.com/apache/flink/pull/3389 merged first. This patch will look simpler. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104609713 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala --- @@ -44,14 +44,24 @@ class ScalarFunctionCallGen( operands: Seq[GeneratedExpression]) : GeneratedExpression = { // determine function signature and result class -val matchingSignature = getSignature(scalarFunction, signature) +val matchingMethod = getEvalMethod(scalarFunction, signature) .getOrElse(throw new CodeGenException("No matching signature found.")) +val matchingSignature = matchingMethod.getParameterTypes val resultClass = getResultTypeClass(scalarFunction, matchingSignature) +// zip for variable signatures +var paramToOperands = matchingSignature.zip(operands) +var i = paramToOperands.length +while (i < operands.length --- End diff -- Sure. I think there will be something similar in https://github.com/apache/flink/pull/3407 (FLINK-5882) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3389: [FLINK-5881] [table] ScalarFunction(UDF) should support v...
Github user clarkyzl commented on the issue: https://github.com/apache/flink/pull/3389 Thanks @wuchong, @twalthr and @KurtYoung . I've pushed a reviewed patch as @KurtYoung 's recent reviews. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104605350 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +138,26 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty && +signatures.last.getName.equals("scala.collection.Seq") && +// If users specified an @varargs, Scala will generate two methods indeed. +// If there does not exists corresponding varargs method of the Seq method, --- End diff -- Typo. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3473: [FLINK-5833] [table] Support for Hive GenericUDF
GitHub user clarkyzl opened a pull request: https://github.com/apache/flink/pull/3473 [FLINK-5833] [table] Support for Hive GenericUDF Type: New Feature Priority: Major Components: table, udf Problem definition: Make Flink call Hive User-Defined Functions, support for simple hive Generic UDFs. Design: 1. This patch is based on FLINK-5881 and FLINK-5832, we need variable arguments to call hive udfs. We also need a HiveFunctionWrapper in FLINK-5832 to create Hive Generic UDFs. 1. Added a ScalarFunction called HiveGenericUDF to call Hive Generic UDFs. Use primitive java object inspectors for the generic UDFs. 1. Moved the code to flink-hcatalog Impact Analysis: A new feature, had few impacts on exsting features. We updated hcatalog dependencies from 0.12.0 to 0.13.0. Test: `mvn clean verify` is done on local. You can merge this pull request into a Git repository by running: $ git pull https://github.com/clarkyzl/flink flink-5833 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3473.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3473 commit 60b68fdd66f8021f6f090e7372987d43362d5ef3 Author: Zhuoluo Yang Date: 2017-02-22T10:53:34Z [FLINK-5881] [table] ScalarFunction(UDF) should support variable types and variable arguments commit fe81a7d87d2620d19e5dd0fa569f139569b6c2aa Author: Zhuoluo Yang Date: 2017-02-23T11:29:37Z [FLINK-5881] [table] Modification as reviews commit 21f8a4ec40a12828e59ff959b56d92c2c2629afd Author: Zhuoluo Yang Date: 2017-02-20T06:04:12Z [FLINK-5832] [table] Support for simple hive UDF commit d734867874169ffd23f084fbdd7f8605208f2b37 Author: Zhuoluo Yang Date: 2017-02-27T05:24:33Z [FLINK-5832] [table] Delete test code commit 3b345f7d19054b7cbb961498d3f817fcaae128b5 Author: Zhuoluo Yang Date: 2017-02-28T07:54:39Z [FLINK-5832] [table] Use scala to implement HiveSimpleUDF commit 8838965d0add8ae1e47d216909e20ccd3d7fcd17 Author: Zhuoluo Yang Date: 2017-02-28T07:56:36Z [FLINK-5832] [table] Remove unlicensed files commit 260bf3b302e5753845ed30fe89b1516121bc3562 Author: Zhuoluo Yang Date: 2017-02-28T09:22:01Z [FLINK-5832] [table] match the signature, varargs commit fcda8546509cb755e101ef21a12b905f97cadf72 Author: Zhuoluo Yang Date: 2017-03-01T07:30:15Z [FLINK-5881] [table] Modification as Jark's two comments commit 1eca2a511de8eeb734f029887f2e0831f9092a7b Author: Zhuoluo Yang Date: 2017-03-01T08:13:40Z Merge branch 'flink-5881' into udf commit 13b2039fcc8785b3b27a3d1537daec1c69aec944 Author: Zhuoluo Yang Date: 2017-03-01T10:40:58Z [FLINK-5832] [table] still needs some ois commit 80123fe9f82c989f74ec1d5194bc1609cfd207aa Author: Zhuoluo Yang Date: 2017-03-02T03:28:38Z [FLINK-5832] [table] Use FunctionRegistry to call HiveSimpleUDFs commit b1cb548aa9f89f1ceef896bd8583a65054190795 Author: Zhuoluo Yang Date: 2017-03-02T08:00:31Z [FLINK-5832] [table] Use PrimitiveObjectInspectors and add some unit tests commit 97968fb86af105f0fc8b68e8083998dd0725deef Author: Zhuoluo Yang Date: 2017-03-02T09:16:03Z [FLINK-5832] [table] remove useless name variable and add more tests commit 40ed7db039b86a5f39fc157fa36d8b55bf6f2bd4 Author: Zhuoluo Yang Date: 2017-03-03T08:15:30Z [FLINK-5832] [table] move all the code to flink-hcatalog commit 21f4c69e962ea8e4ca9083be4a6837dfa84df1cd Author: Zhuoluo Yang Date: 2017-03-03T08:59:05Z [FLINK-5832] [table] Remove newer version of hive udfs which does not exist in flink-hcatalog commit c86effa5cc9b23c7d058f7d01bf8f539c539a116 Author: Zhuoluo Yang Date: 2017-03-04T03:55:34Z [FLINK-5833] [table] Support for Hive GenericUDF --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3456: [FLINK-5832] [table] Support for simple hive UDF
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3456#discussion_r104109600 --- Diff: flink-libraries/flink-table/pom.xml --- @@ -92,6 +92,11 @@ under the License. + + org.apache.hive --- End diff -- Hi @twalthr .After moved the code to flink-hcatalog, I found the `mvn clean verify` will fail the test. The reason is that the code depends on the FLINK-5881. However, the flink-hcatalog depends on the `1.3-SNAPSHOT` of the maven repository. The patch of FLINK-5881 wasn't available there. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3456: [FLINK-5832] [table] Support for simple hive UDF
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3456#discussion_r104106388 --- Diff: flink-libraries/flink-table/pom.xml --- @@ -92,6 +92,11 @@ under the License. + + org.apache.hive --- End diff -- Hi @twalthr . Thank you for your suggestions. I've moved all the code to hcatalog in commit 97968fb86af105f0fc8b68e8083998dd0725deef. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3456: [FLINK-5832] [table] Support for simple hive UDF
GitHub user clarkyzl opened a pull request: https://github.com/apache/flink/pull/3456 [FLINK-5832] [table] Support for simple hive UDF Type: New Feature Priority: Major Components: table, udf, hive Problem definition: Make Flink call Hive User-Defined Functions, support for simple hive UDFs. Design: 1. This patch is based on FLINK-5881, we need variable arguments to call hive udfs. 2. Added a HiveFunctionWrapper to create Hive simple UDFs. 3. Added a ScalarFunction called HiveSimpleUDF to call Hive Simple UDFs. Use primitive java object inspectors for the simple UDFs. 4. A few modification of type comparation, make type of Object equal any types. 5. Added a new method in TableEnvironment to rigister hive UDFs. Impact Analysis: A new feature, had few impacts on exsting features, except for the comparation of types. Test: `mvn clean verify` is done on local. You can merge this pull request into a Git repository by running: $ git pull https://github.com/clarkyzl/flink udf Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3456.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3456 commit 60b68fdd66f8021f6f090e7372987d43362d5ef3 Author: Zhuoluo Yang Date: 2017-02-22T10:53:34Z [FLINK-5881] [table] ScalarFunction(UDF) should support variable types and variable arguments commit fe81a7d87d2620d19e5dd0fa569f139569b6c2aa Author: Zhuoluo Yang Date: 2017-02-23T11:29:37Z [FLINK-5881] [table] Modification as reviews commit 21f8a4ec40a12828e59ff959b56d92c2c2629afd Author: Zhuoluo Yang Date: 2017-02-20T06:04:12Z [FLINK-5832] [table] Support for simple hive UDF commit d734867874169ffd23f084fbdd7f8605208f2b37 Author: Zhuoluo Yang Date: 2017-02-27T05:24:33Z [FLINK-5832] [table] Delete test code commit 3b345f7d19054b7cbb961498d3f817fcaae128b5 Author: Zhuoluo Yang Date: 2017-02-28T07:54:39Z [FLINK-5832] [table] Use scala to implement HiveSimpleUDF commit 8838965d0add8ae1e47d216909e20ccd3d7fcd17 Author: Zhuoluo Yang Date: 2017-02-28T07:56:36Z [FLINK-5832] [table] Remove unlicensed files commit 260bf3b302e5753845ed30fe89b1516121bc3562 Author: Zhuoluo Yang Date: 2017-02-28T09:22:01Z [FLINK-5832] [table] match the signature, varargs commit fcda8546509cb755e101ef21a12b905f97cadf72 Author: Zhuoluo Yang Date: 2017-03-01T07:30:15Z [FLINK-5881] [table] Modification as Jark's two comments commit 1eca2a511de8eeb734f029887f2e0831f9092a7b Author: Zhuoluo Yang Date: 2017-03-01T08:13:40Z Merge branch 'flink-5881' into udf commit 13b2039fcc8785b3b27a3d1537daec1c69aec944 Author: Zhuoluo Yang Date: 2017-03-01T10:40:58Z [FLINK-5832] [table] still needs some ois commit 80123fe9f82c989f74ec1d5194bc1609cfd207aa Author: Zhuoluo Yang Date: 2017-03-02T03:28:38Z [FLINK-5832] [table] Use FunctionRegistry to call HiveSimpleUDFs commit b1cb548aa9f89f1ceef896bd8583a65054190795 Author: Zhuoluo Yang Date: 2017-03-02T08:00:31Z [FLINK-5832] [table] Use PrimitiveObjectInspectors and add some unit tests --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r103632353 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +138,26 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty && +signatures.last.getName.equals("scala.collection.Seq") && +// If users specified an @varargs, Scala will generate two methods indeed. +// If there does not exists corresponding varargs method of the Seq method, +// we will throw an ValidationException. +(!methods.exists(m => { + val sigs = m.getParameterTypes + m.isVarArgs && + sigs.length == signatures.length && + sigs.zipWithIndex.forall { case (sig, i) => + i == sigs.length - 1 || sig.equals(signatures(i)) + } +}))) { + throw new ValidationException("The 'eval' method do not support Scala type of " + --- End diff -- I've updated the patch and refactored the logic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r103632469 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala --- @@ -112,9 +112,16 @@ object ScalarSqlFunction { .getParameterTypes(foundSignature) .map(typeFactory.createTypeFromTypeInfo) -inferredTypes.zipWithIndex.foreach { - case (inferredType, i) => -operandTypes(i) = inferredType +operandTypes.zipWithIndex.foreach { + case (_, i) => +if (i < inferredTypes.length - 1) { + operandTypes(i) = inferredTypes(i) +} else if (null != inferredTypes.last.getComponentType) { + // last arguments is a collection, the array type + operandTypes(i) = inferredTypes.last.getComponentType +} else { + operandTypes(i) = inferredTypes.last +} --- End diff -- The logic here has also been changed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r103369181 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +138,24 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var noVargs = true + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty) { + if (method.isVarArgs) { +noVargs = false + } else if (signatures.last.getName.equals("scala.collection.Seq")) { +trailingSeq = true + } +} + }) + if (trailingSeq && noVargs) { +// We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..." +throw new ValidationException("The 'eval' method do not support Scala type of " + --- End diff -- I think one of the important thing here is to check type by type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3407: [FLINK-5882] [table] TableFunction (UDTF) should s...
GitHub user clarkyzl opened a pull request: https://github.com/apache/flink/pull/3407 [FLINK-5882] [table] TableFunction (UDTF) should support variable types and variable arguments Type: New Feature Priority: Major Components: table, udtf, TableFunction Problem Definition: TableFunction (UDTF) should support variable types and variable arguments Design: 1. This pull request is based on FLINK-5881 2. The only modification is on TableFunctionCallGen, and this part is similar with ScalarFunctionCallGen. It will make the the code generator accept the variable arguments without losing any part of it. 3. add some tests, both stream and dataset. Impact Analysis: The only modification locates in TableFunctionCallGen. The orther part is based on FLINK-5881. Test: Unit tests done. You can merge this pull request into a Git repository by running: $ git pull https://github.com/clarkyzl/flink flink-5882 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3407.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3407 commit 9af14d2be30edc5a3226ee60f218757ae029de21 Author: Zhuoluo Yang Date: 2017-02-22T10:53:34Z [FLINK-5881] [table] ScalarFunction(UDF) should support variable types and variable arguments commit e5b85bec9a7becdfacef70599cd1a4ea4c5d Author: Zhuoluo Yang Date: 2017-02-23T11:29:37Z [FLINK-5881] [table] Modification as reviews commit df4eab12d7657e971a2e6e72e0d1bd80a98ea8b0 Author: Zhuoluo Yang Date: 2017-02-24T08:17:50Z [FLINK-5882] [table] TableFunction (UDTF) should support variable types and variable arguments --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102864763 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +138,24 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var noVargs = true + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty) { + if (method.isVarArgs) { +noVargs = false + } else if (signatures.last.getName.equals("scala.collection.Seq")) { +trailingSeq = true + } +} + }) + if (trailingSeq && noVargs) { +// We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..." +throw new ValidationException("The 'eval' method do not support Scala type of " + --- End diff -- This is correct. Because if there is multiple methods found (override), it will throw another exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102865002 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -78,20 +78,7 @@ object UserDefinedFunctionUtils { function: UserDefinedFunction, signature: Seq[TypeInformation[_]]) : Option[Array[Class[_]]] = { -// We compare the raw Java classes not the TypeInformation. -// TypeInformation does not matter during runtime (e.g. within a MapFunction). -val actualSignature = typeInfoToClass(signature) -val signatures = getSignatures(function) - -signatures - // go over all signatures and find one matching actual signature - .find { curSig => - // match parameters of signature to actual parameters - actualSignature.length == curSig.length && -curSig.zipWithIndex.forall { case (clazz, i) => - parameterTypeEquals(actualSignature(i), clazz) -} -} --- End diff -- I deleted them, because both methods are simply copy and paste. One was used for ScalarFunction, the other was used for TableFunction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3389: [FLINK-5881] [table] ScalarFunction(UDF) should support v...
Github user clarkyzl commented on the issue: https://github.com/apache/flink/pull/3389 If the following scenario happens, ```java public int eval(String a, int... b) { return b.length; } public String eval(String c) { return c; } ``` It will throw a ValidationException --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102632219 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala --- @@ -48,10 +48,16 @@ class ScalarFunctionCallGen( .getOrElse(throw new CodeGenException("No matching signature found.")) val resultClass = getResultTypeClass(scalarFunction, matchingSignature) +// zip for variable signatures +var paramToOperands = matchingSignature.zip(operands) +var i = paramToOperands.length +while (i < operands.length) { + paramToOperands = paramToOperands :+ (matchingSignature.head, operands(i)) --- End diff -- I will try to use `getComponentType()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102626945 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala --- @@ -48,10 +48,16 @@ class ScalarFunctionCallGen( .getOrElse(throw new CodeGenException("No matching signature found.")) val resultClass = getResultTypeClass(scalarFunction, matchingSignature) +// zip for variable signatures +var paramToOperands = matchingSignature.zip(operands) +var i = paramToOperands.length +while (i < operands.length) { + paramToOperands = paramToOperands :+ (matchingSignature.head, operands(i)) --- End diff -- Thanks, @wuchong .Yes. It's a mistake here. And tests haven't covered this situation. Since the max number of the arguments is 254. I don't think it is necessary to use a component type at the phase of code generation. I will try to add some tests to cover this situation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102628732 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala --- @@ -181,6 +181,22 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { } @Test + def testVariableArgs(): Unit = { +testAllApis( --- End diff -- Sure. Thanks @wuchong . As we discussed above, I will add more tests here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102628111 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +147,25 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var trailingArray = false + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty) { + val trailingArg = signatures(signatures.length - 1) + if (trailingArg.getName.equals("scala.collection.Seq")) { +trailingSeq = true + } else if (trailingArg.isArray) { +trailingArray = true + } +} + }) + if (trailingSeq && !trailingArray) { --- End diff -- If the users use the annotation `@scala.annotation.varargs`, Scala will generate two signatures of the method. One is `T eval(scala.collection.Seq args)`, the other is `T eval(T[] args)`. A better idea is to compare every arguments of the signature. We can make sure either there is only one method `T eval(T[] args)`, or there are two methods: `T eval(scala.collection.Seq args)` and `T eval(T[] args)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102627293 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -87,10 +87,16 @@ object UserDefinedFunctionUtils { // go over all signatures and find one matching actual signature .find { curSig => // match parameters of signature to actual parameters - actualSignature.length == curSig.length && + (actualSignature.length == curSig.length && curSig.zipWithIndex.forall { case (clazz, i) => parameterTypeEquals(actualSignature(i), clazz) -} +}) || +// matching the style which last argument is variable, eg. "Type..." "Type*" +(actualSignature.length >= curSig.length && --- End diff -- Thanks @wuchong . I will do some tests and revisions to handle this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102627751 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -87,10 +87,16 @@ object UserDefinedFunctionUtils { // go over all signatures and find one matching actual signature .find { curSig => // match parameters of signature to actual parameters - actualSignature.length == curSig.length && + (actualSignature.length == curSig.length && curSig.zipWithIndex.forall { case (clazz, i) => parameterTypeEquals(actualSignature(i), clazz) -} +}) || +// matching the style which last argument is variable, eg. "Type..." "Type*" +(actualSignature.length >= curSig.length && + curSig.zipWithIndex.forall { case (clazz, i) => + parameterTypeEquals(actualSignature(i), clazz) || +(i == curSig.length - 1 && clazz.isArray) --- End diff -- Thanks @wuchong. They are good suggestions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102628665 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +147,25 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var trailingArray = false + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty) { + val trailingArg = signatures(signatures.length - 1) + if (trailingArg.getName.equals("scala.collection.Seq")) { +trailingSeq = true + } else if (trailingArg.isArray) { +trailingArray = true + } +} + }) + if (trailingSeq && !trailingArray) { +// We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..." +throw new ValidationException("The 'eval' method do not support Scala type of " + + "variable args eg. scala.collection.Seq or Type*, please add a @varargs annotation " + --- End diff -- Sure. Thanks @wuchong --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102483144 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala --- @@ -136,8 +136,18 @@ object ScalarSqlFunction { } override def getOperandCountRange: SqlOperandCountRange = { -val signatureLengths = signatures.map(_.length) -SqlOperandCountRanges.between(signatureLengths.min, signatureLengths.max) +var min = 255 +var max = -1 +signatures.foreach(sig => { + var len = sig.length + if (len > 0 && sig(sig.length -1).isArray) { +max = 254 // according to JVM spec 4.3.3 --- End diff -- Hi @wuchong . Yes, according to JVM specification. It may have 255 parameters, if the method is static. If the method is not static, the pointer "this" will be one of the parameters, so it is 254. FYI, http://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.3.3 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
GitHub user clarkyzl opened a pull request: https://github.com/apache/flink/pull/3389 [FLINK-5881] [table] ScalarFunction(UDF) should support variable types and variable arguments Type: New Feature Priority: Major Components: table, udf, ScalarFunction Problem Definition: [FLINK-5881] [table] ScalarFunction(UDF) should support variable types and variable arguments Design: 1. Modified the getSignature() method in UserDefinedFunctionUtils, made trailing style of variable arguments can be found. The "(TypeA a, Type B b, TypeC... c)" or "(a: TypeA, b: TypeB, c: TypeC*)" with annotation will pass the method. 2. Modified the SqlOperandTypeChecker, made the count range of sql operands flexible. So it will pass the sql node validation of calcite. 3. Modified the checkAndExtractEvalMethods() method, and throw a human readable VaidataionException if the user specified the variable arguments in Scala and forgot to add the "@varargs" annotation. Please see the discussion in FLINK-5826. Impact Analysis: It's a minor modification and it's a new feature. It impacts minimal in UDF. Test: Added both scala tests and java tests for all apis. You can merge this pull request into a Git repository by running: $ git pull https://github.com/clarkyzl/flink flink-5881 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3389.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3389 commit 60b68fdd66f8021f6f090e7372987d43362d5ef3 Author: Zhuoluo Yang Date: 2017-02-22T10:53:34Z [FLINK-5881] [table] ScalarFunction(UDF) should support variable types and variable arguments --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3292: [FLINK-5739] [client] fix NullPointerException in CliFron...
Github user clarkyzl commented on the issue: https://github.com/apache/flink/pull/3292 Thanks a lot --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3292: [FLINK-5739] [client] fix NullPointerException in ...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3292#discussion_r100945289 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -842,6 +842,12 @@ protected int executeProgram(PackagedProgram program, ClusterClient client, int program.deleteExtractedLibraries(); } + if (null == result) { + logAndSysout("No JobSubmissionResult returned, please make sure you called " + --- End diff -- Added a space here and try again --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3292: [FLINK-5739] [client] fix NullPointerException in ...
GitHub user clarkyzl opened a pull request: https://github.com/apache/flink/pull/3292 [FLINK-5739] [client] fix NullPointerException in CliFrontend Type: Bug Priority: Major Problem Definition: CliFrontEnd throws a NullPointerException Design: see https://issues.apache.org/jira/browse/FLINK-5739 Client will throw a NullPointerException if use forgot to call method "execute()". User may be confused by this NullPointerException. This patch adds a message here, and makes the NullPointerException meaningful. Impact Analysis: Only the client and messages are affected. Test: mvn clean verify has been done. You can merge this pull request into a Git repository by running: $ git pull https://github.com/clarkyzl/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3292.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3292 commit 2921e4170502f54e1c2fb11e22cb2169deebe78e Author: Zhuoluo Yang Date: 2017-02-10T07:22:50Z [FLINK-5739] [client] fix NullPointerException in CliFrontend commit db035643c782fb65c26efd10c8bf36d4173103f0 Author: Zhuoluo Yang Date: 2017-02-10T07:33:35Z [FLINK-5739] [client] fix code style --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---